1#!/usr/bin/python2 2# 3# Copyright (C) 2015 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18"""Unit testing payload_info.py.""" 19 20from __future__ import print_function 21 22import StringIO 23import collections 24import mock 25import sys 26import unittest 27 28import payload_info 29import update_payload 30 31from contextlib import contextmanager 32 33from update_payload import update_metadata_pb2 34 35class FakePayloadError(Exception): 36 """A generic error when using the FakePayload.""" 37 38class FakeOption(object): 39 """Fake options object for testing.""" 40 41 def __init__(self, **kwargs): 42 self.list_ops = False 43 self.stats = False 44 self.signatures = False 45 for key, val in kwargs.iteritems(): 46 setattr(self, key, val) 47 if not hasattr(self, 'payload_file'): 48 self.payload_file = None 49 50class FakeOp(object): 51 """Fake manifest operation for testing.""" 52 53 def __init__(self, src_extents, dst_extents, op_type, **kwargs): 54 self.src_extents = src_extents 55 self.dst_extents = dst_extents 56 self.type = op_type 57 for key, val in kwargs.iteritems(): 58 setattr(self, key, val) 59 60 def HasField(self, field): 61 return hasattr(self, field) 62 63class FakePartition(object): 64 """Fake PartitionUpdate field for testing.""" 65 66 def __init__(self, partition_name, operations): 67 self.partition_name = partition_name 68 self.operations = operations 69 70class FakeManifest(object): 71 """Fake manifest for testing.""" 72 73 def __init__(self, major_version): 74 FakeExtent = collections.namedtuple('FakeExtent', 75 ['start_block', 'num_blocks']) 76 self.install_operations = [FakeOp([], 77 [FakeExtent(1, 1), FakeExtent(2, 2)], 78 update_payload.common.OpType.REPLACE_BZ, 79 dst_length=3*4096, 80 data_offset=1, 81 data_length=1)] 82 self.kernel_install_operations = [FakeOp( 83 [FakeExtent(1, 1)], 84 [FakeExtent(x, x) for x in xrange(20)], 85 update_payload.common.OpType.SOURCE_COPY, 86 src_length=4096)] 87 if major_version == payload_info.MAJOR_PAYLOAD_VERSION_BRILLO: 88 self.partitions = [FakePartition('root', self.install_operations), 89 FakePartition('kernel', 90 self.kernel_install_operations)] 91 self.install_operations = self.kernel_install_operations = [] 92 self.block_size = 4096 93 self.minor_version = 4 94 FakePartInfo = collections.namedtuple('FakePartInfo', ['size']) 95 self.old_rootfs_info = FakePartInfo(1 * 4096) 96 self.old_kernel_info = FakePartInfo(2 * 4096) 97 self.new_rootfs_info = FakePartInfo(3 * 4096) 98 self.new_kernel_info = FakePartInfo(4 * 4096) 99 self.signatures_offset = None 100 self.signatures_size = None 101 102 def HasField(self, field_name): 103 """Fake HasField method based on the python members.""" 104 return hasattr(self, field_name) and getattr(self, field_name) is not None 105 106class FakeHeader(object): 107 """Fake payload header for testing.""" 108 109 def __init__(self, version, manifest_len, metadata_signature_len): 110 self.version = version 111 self.manifest_len = manifest_len 112 self.metadata_signature_len = metadata_signature_len 113 114 @property 115 def size(self): 116 return (20 if self.version == payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS 117 else 24) 118 119class FakePayload(object): 120 """Fake payload for testing.""" 121 122 def __init__(self, major_version): 123 self._header = FakeHeader(major_version, 222, 0) 124 self.header = None 125 self._manifest = FakeManifest(major_version) 126 self.manifest = None 127 128 self._blobs = {} 129 self._payload_signatures = update_metadata_pb2.Signatures() 130 self._metadata_signatures = update_metadata_pb2.Signatures() 131 132 def Init(self): 133 """Fake Init that sets header and manifest. 134 135 Failing to call Init() will not make header and manifest available to the 136 test. 137 """ 138 self.header = self._header 139 self.manifest = self._manifest 140 141 def ReadDataBlob(self, offset, length): 142 """Return the blob that should be present at the offset location""" 143 if not offset in self._blobs: 144 raise FakePayloadError('Requested blob at unknown offset %d' % offset) 145 blob = self._blobs[offset] 146 if len(blob) != length: 147 raise FakePayloadError('Read blob with the wrong length (expect: %d, ' 148 'actual: %d)' % (len(blob), length)) 149 return blob 150 151 @staticmethod 152 def _AddSignatureToProto(proto, **kwargs): 153 """Add a new Signature element to the passed proto.""" 154 new_signature = proto.signatures.add() 155 for key, val in kwargs.iteritems(): 156 setattr(new_signature, key, val) 157 158 def AddPayloadSignature(self, **kwargs): 159 self._AddSignatureToProto(self._payload_signatures, **kwargs) 160 blob = self._payload_signatures.SerializeToString() 161 self._manifest.signatures_offset = 1234 162 self._manifest.signatures_size = len(blob) 163 self._blobs[self._manifest.signatures_offset] = blob 164 165 def AddMetadataSignature(self, **kwargs): 166 self._AddSignatureToProto(self._metadata_signatures, **kwargs) 167 if self._header.metadata_signature_len: 168 del self._blobs[-self._header.metadata_signature_len] 169 blob = self._metadata_signatures.SerializeToString() 170 self._header.metadata_signature_len = len(blob) 171 self._blobs[-len(blob)] = blob 172 173class PayloadCommandTest(unittest.TestCase): 174 """Test class for our PayloadCommand class.""" 175 176 @contextmanager 177 def OutputCapturer(self): 178 """A tool for capturing the sys.stdout""" 179 stdout = sys.stdout 180 try: 181 sys.stdout = StringIO.StringIO() 182 yield sys.stdout 183 finally: 184 sys.stdout = stdout 185 186 def TestCommand(self, payload_cmd, payload, expected_out): 187 """A tool for testing a payload command. 188 189 It tests that a payload command which runs with a given payload produces a 190 correct output. 191 """ 192 with mock.patch.object(update_payload, 'Payload', return_value=payload), \ 193 self.OutputCapturer() as output: 194 payload_cmd.Run() 195 self.assertEquals(output.getvalue(), expected_out) 196 197 def testDisplayValue(self): 198 """Verify that DisplayValue prints what we expect.""" 199 with self.OutputCapturer() as output: 200 payload_info.DisplayValue('key', 'value') 201 self.assertEquals(output.getvalue(), 'key: value\n') 202 203 def testRun(self): 204 """Verify that Run parses and displays the payload like we expect.""" 205 payload_cmd = payload_info.PayloadCommand(FakeOption(action='show')) 206 payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS) 207 expected_out = """Payload version: 1 208Manifest length: 222 209Number of operations: 1 210Number of kernel ops: 1 211Block size: 4096 212Minor version: 4 213""" 214 self.TestCommand(payload_cmd, payload, expected_out) 215 216 def testListOpsOnVersion1(self): 217 """Verify that the --list_ops option gives the correct output.""" 218 payload_cmd = payload_info.PayloadCommand( 219 FakeOption(list_ops=True, action='show')) 220 payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS) 221 expected_out = """Payload version: 1 222Manifest length: 222 223Number of operations: 1 224Number of kernel ops: 1 225Block size: 4096 226Minor version: 4 227 228Install operations: 229 0: REPLACE_BZ 230 Data offset: 1 231 Data length: 1 232 Destination: 2 extents (3 blocks) 233 (1,1) (2,2) 234Kernel install operations: 235 0: SOURCE_COPY 236 Source: 1 extent (1 block) 237 (1,1) 238 Destination: 20 extents (190 blocks) 239 (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) 240 (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) 241""" 242 self.TestCommand(payload_cmd, payload, expected_out) 243 244 def testListOpsOnVersion2(self): 245 """Verify that the --list_ops option gives the correct output.""" 246 payload_cmd = payload_info.PayloadCommand( 247 FakeOption(list_ops=True, action='show')) 248 payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO) 249 expected_out = """Payload version: 2 250Manifest length: 222 251Number of partitions: 2 252 Number of "root" ops: 1 253 Number of "kernel" ops: 1 254Block size: 4096 255Minor version: 4 256 257root install operations: 258 0: REPLACE_BZ 259 Data offset: 1 260 Data length: 1 261 Destination: 2 extents (3 blocks) 262 (1,1) (2,2) 263kernel install operations: 264 0: SOURCE_COPY 265 Source: 1 extent (1 block) 266 (1,1) 267 Destination: 20 extents (190 blocks) 268 (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) 269 (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) 270""" 271 self.TestCommand(payload_cmd, payload, expected_out) 272 273 def testStatsOnVersion1(self): 274 """Verify that the --stats option works correctly.""" 275 payload_cmd = payload_info.PayloadCommand( 276 FakeOption(stats=True, action='show')) 277 payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS) 278 expected_out = """Payload version: 1 279Manifest length: 222 280Number of operations: 1 281Number of kernel ops: 1 282Block size: 4096 283Minor version: 4 284Blocks read: 11 285Blocks written: 193 286Seeks when writing: 18 287""" 288 self.TestCommand(payload_cmd, payload, expected_out) 289 290 def testStatsOnVersion2(self): 291 """Verify that the --stats option works correctly on version 2.""" 292 payload_cmd = payload_info.PayloadCommand( 293 FakeOption(stats=True, action='show')) 294 payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO) 295 expected_out = """Payload version: 2 296Manifest length: 222 297Number of partitions: 2 298 Number of "root" ops: 1 299 Number of "kernel" ops: 1 300Block size: 4096 301Minor version: 4 302Blocks read: 11 303Blocks written: 193 304Seeks when writing: 18 305""" 306 self.TestCommand(payload_cmd, payload, expected_out) 307 308 def testEmptySignatures(self): 309 """Verify that the --signatures option works with unsigned payloads.""" 310 payload_cmd = payload_info.PayloadCommand( 311 FakeOption(action='show', signatures=True)) 312 payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS) 313 expected_out = """Payload version: 1 314Manifest length: 222 315Number of operations: 1 316Number of kernel ops: 1 317Block size: 4096 318Minor version: 4 319No metadata signatures stored in the payload 320No payload signatures stored in the payload 321""" 322 self.TestCommand(payload_cmd, payload, expected_out) 323 324 def testSignatures(self): 325 """Verify that the --signatures option shows the present signatures.""" 326 payload_cmd = payload_info.PayloadCommand( 327 FakeOption(action='show', signatures=True)) 328 payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO) 329 payload.AddPayloadSignature(version=1, 330 data='12345678abcdefgh\x00\x01\x02\x03') 331 payload.AddPayloadSignature(data='I am a signature so access is yes.') 332 payload.AddMetadataSignature(data='\x00\x0a\x0c') 333 expected_out = """Payload version: 2 334Manifest length: 222 335Number of partitions: 2 336 Number of "root" ops: 1 337 Number of "kernel" ops: 1 338Block size: 4096 339Minor version: 4 340Metadata signatures blob: file_offset=246 (7 bytes) 341Metadata signatures: (1 entries) 342 version=None, hex_data: (3 bytes) 343 00 0a 0c | ... 344Payload signatures blob: blob_offset=1234 (64 bytes) 345Payload signatures: (2 entries) 346 version=1, hex_data: (20 bytes) 347 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh 348 00 01 02 03 | .... 349 version=None, hex_data: (34 bytes) 350 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature 351 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye 352 73 2e | s. 353""" 354 self.TestCommand(payload_cmd, payload, expected_out) 355 356if __name__ == '__main__': 357 unittest.main() 358