1#!/usr/bin/env python 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18Check dynamic partition sizes. 19 20usage: check_partition_sizes [info.txt] 21 22Check dump-super-partitions-info procedure for expected keys in info.txt. In 23addition, *_image (e.g. system_image, vendor_image, etc.) must be defined for 24each partition in dynamic_partition_list. 25 26Exit code is 0 if successful and non-zero if any failures. 27""" 28 29from __future__ import print_function 30 31import logging 32import sys 33 34import common 35import sparse_img 36 37if sys.hexversion < 0x02070000: 38 print("Python 2.7 or newer is required.", file=sys.stderr) 39 sys.exit(1) 40 41logger = logging.getLogger(__name__) 42 43 44class Expression(object): 45 def __init__(self, desc, expr, value=None): 46 # Human-readable description 47 self.desc = str(desc) 48 # Numeric expression 49 self.expr = str(expr) 50 # Value of expression 51 self.value = int(expr) if value is None else value 52 53 def CheckLe(self, other, level=logging.ERROR): 54 format_args = (self.desc, other.desc, self.expr, self.value, 55 other.expr, other.value) 56 if self.value <= other.value: 57 logger.info("%s is less than or equal to %s:\n%s == %d <= %s == %d", 58 *format_args) 59 else: 60 msg = "{} is greater than {}:\n{} == {} > {} == {}".format(*format_args) 61 if level == logging.ERROR: 62 raise RuntimeError(msg) 63 else: 64 logger.log(level, msg) 65 66 def CheckLt(self, other, level=logging.ERROR): 67 format_args = (self.desc, other.desc, self.expr, self.value, 68 other.expr, other.value) 69 if self.value < other.value: 70 logger.info("%s is less than %s:\n%s == %d < %s == %d", 71 *format_args) 72 else: 73 msg = "{} is greater than or equal to {}:\n{} == {} >= {} == {}".format( 74 *format_args) 75 if level == logging.ERROR: 76 raise RuntimeError(msg) 77 else: 78 logger.log(level, msg) 79 80 def CheckEq(self, other): 81 format_args = (self.desc, other.desc, self.expr, self.value, 82 other.expr, other.value) 83 if self.value == other.value: 84 logger.info("%s equals %s:\n%s == %d == %s == %d", *format_args) 85 else: 86 raise RuntimeError("{} does not equal {}:\n{} == {} != {} == {}".format( 87 *format_args)) 88 89 90# A/B feature flags 91class DeviceType(object): 92 NONE = 0 93 AB = 1 94 RVAB = 2 # retrofit Virtual-A/B 95 VAB = 3 96 97 @staticmethod 98 def Get(info_dict): 99 if info_dict.get("ab_update") != "true": 100 return DeviceType.NONE 101 if info_dict.get("virtual_ab_retrofit") == "true": 102 return DeviceType.RVAB 103 if info_dict.get("virtual_ab") == "true": 104 return DeviceType.VAB 105 return DeviceType.AB 106 107 108# Dynamic partition feature flags 109class Dap(object): 110 NONE = 0 111 RDAP = 1 112 DAP = 2 113 114 @staticmethod 115 def Get(info_dict): 116 if info_dict.get("use_dynamic_partitions") != "true": 117 return Dap.NONE 118 if info_dict.get("dynamic_partition_retrofit") == "true": 119 return Dap.RDAP 120 return Dap.DAP 121 122 123class DynamicPartitionSizeChecker(object): 124 def __init__(self, info_dict): 125 if "super_partition_size" in info_dict: 126 if "super_partition_warn_limit" not in info_dict: 127 info_dict["super_partition_warn_limit"] = \ 128 int(info_dict["super_partition_size"]) * 95 // 100 129 if "super_partition_error_limit" not in info_dict: 130 info_dict["super_partition_error_limit"] = \ 131 int(info_dict["super_partition_size"]) 132 self.info_dict = info_dict 133 134 def _ReadSizeOfPartition(self, name): 135 # Tests uses *_image_size instead (to avoid creating empty sparse images 136 # on disk) 137 if name + "_image_size" in self.info_dict: 138 return int(self.info_dict[name + "_image_size"]) 139 return sparse_img.GetImagePartitionSize(self.info_dict[name + "_image"]) 140 141 # Round result to BOARD_SUPER_PARTITION_ALIGNMENT 142 def _RoundPartitionSize(self, size): 143 alignment = self.info_dict.get("super_partition_alignment") 144 if alignment is None: 145 return size 146 return (size + alignment - 1) // alignment * alignment 147 148 def _CheckSuperPartitionSize(self): 149 info_dict = self.info_dict 150 super_block_devices = \ 151 info_dict.get("super_block_devices", "").strip().split() 152 size_list = [int(info_dict.get("super_{}_device_size".format(b), "0")) 153 for b in super_block_devices] 154 sum_size = Expression("sum of super partition block device sizes", 155 "+".join(str(size) for size in size_list), 156 sum(size_list)) 157 super_partition_size = Expression("BOARD_SUPER_PARTITION_SIZE", 158 info_dict["super_partition_size"]) 159 sum_size.CheckEq(super_partition_size) 160 161 def _CheckSumOfPartitionSizes(self, max_size, partition_names, 162 warn_size=None, error_size=None): 163 partition_size_list = [self._RoundPartitionSize( 164 self._ReadSizeOfPartition(p)) for p in partition_names] 165 sum_size = Expression("sum of sizes of {}".format(partition_names), 166 "+".join(str(size) for size in partition_size_list), 167 sum(partition_size_list)) 168 sum_size.CheckLe(max_size) 169 if error_size: 170 sum_size.CheckLe(error_size) 171 if warn_size: 172 sum_size.CheckLe(warn_size, level=logging.WARNING) 173 174 def _NumDeviceTypesInSuper(self): 175 slot = DeviceType.Get(self.info_dict) 176 dap = Dap.Get(self.info_dict) 177 178 if dap == Dap.NONE: 179 raise RuntimeError("check_partition_sizes should only be executed on " 180 "builds with dynamic partitions enabled") 181 182 # Retrofit dynamic partitions: 1 slot per "super", 2 "super"s on the device 183 if dap == Dap.RDAP: 184 if slot != DeviceType.AB: 185 raise RuntimeError("Device with retrofit dynamic partitions must use " 186 "regular (non-Virtual) A/B") 187 return 1 188 189 # Launch DAP: 1 super on the device 190 assert dap == Dap.DAP 191 192 # DAP + A/B: 2 slots in super 193 if slot == DeviceType.AB: 194 return 2 195 196 # DAP + retrofit Virtual A/B: same as A/B 197 if slot == DeviceType.RVAB: 198 return 2 199 200 # DAP + Launch Virtual A/B: 1 *real* slot in super (2 virtual slots) 201 if slot == DeviceType.VAB: 202 return 1 203 204 # DAP + non-A/B: 1 slot in super 205 assert slot == DeviceType.NONE 206 return 1 207 208 def _CheckAllPartitionSizes(self): 209 info_dict = self.info_dict 210 num_slots = self._NumDeviceTypesInSuper() 211 size_limit_suffix = (" / %d" % num_slots) if num_slots > 1 else "" 212 213 # Check sum(all partitions) <= super partition (/ 2 for A/B devices launched 214 # with dynamic partitions) 215 if "super_partition_size" in info_dict and \ 216 "dynamic_partition_list" in info_dict: 217 max_size = Expression( 218 "BOARD_SUPER_PARTITION_SIZE{}".format(size_limit_suffix), 219 int(info_dict["super_partition_size"]) // num_slots) 220 warn_limit = Expression( 221 "BOARD_SUPER_PARTITION_WARN_LIMIT{}".format(size_limit_suffix), 222 int(info_dict["super_partition_warn_limit"]) // num_slots) 223 error_limit = Expression( 224 "BOARD_SUPER_PARTITION_ERROR_LIMIT{}".format(size_limit_suffix), 225 int(info_dict["super_partition_error_limit"]) // num_slots) 226 partitions_in_super = info_dict["dynamic_partition_list"].strip().split() 227 # In the vab case, factory OTA will allocate space on super to install 228 # the system_other partition. So add system_other to the partition list. 229 if DeviceType.Get(self.info_dict) == DeviceType.VAB and ( 230 "system_other_image" in info_dict or 231 "system_other_image_size" in info_dict): 232 partitions_in_super.append("system_other") 233 self._CheckSumOfPartitionSizes(max_size, partitions_in_super, 234 warn_limit, error_limit) 235 236 groups = info_dict.get("super_partition_groups", "").strip().split() 237 238 # For each group, check sum(partitions in group) <= group size 239 for group in groups: 240 if "super_{}_group_size".format(group) in info_dict and \ 241 "super_{}_partition_list".format(group) in info_dict: 242 group_size = Expression( 243 "BOARD_{}_SIZE".format(group), 244 int(info_dict["super_{}_group_size".format(group)])) 245 self._CheckSumOfPartitionSizes( 246 group_size, 247 info_dict["super_{}_partition_list".format(group)].strip().split()) 248 249 # Check sum(all group sizes) <= super partition (/ 2 for A/B devices 250 # launched with dynamic partitions) 251 if "super_partition_size" in info_dict: 252 group_size_list = [int(info_dict.get( 253 "super_{}_group_size".format(group), 0)) for group in groups] 254 sum_size = Expression("sum of sizes of {}".format(groups), 255 "+".join(str(size) for size in group_size_list), 256 sum(group_size_list)) 257 max_size = Expression( 258 "BOARD_SUPER_PARTITION_SIZE{}".format(size_limit_suffix), 259 int(info_dict["super_partition_size"]) // num_slots) 260 # Retrofit DAP will build metadata as part of super image. 261 if Dap.Get(info_dict) == Dap.RDAP: 262 sum_size.CheckLe(max_size) 263 return 264 265 sum_size.CheckLt(max_size) 266 # Display a warning if group size + 1M >= super size 267 minimal_metadata_size = 1024 * 1024 # 1MiB 268 sum_size_plus_metadata = Expression( 269 "sum of sizes of {} plus 1M metadata".format(groups), 270 "+".join(str(size) for size in 271 group_size_list + [minimal_metadata_size]), 272 sum(group_size_list) + minimal_metadata_size) 273 sum_size_plus_metadata.CheckLe(max_size, level=logging.WARNING) 274 275 def Run(self): 276 self._CheckAllPartitionSizes() 277 if self.info_dict.get("dynamic_partition_retrofit") == "true": 278 self._CheckSuperPartitionSize() 279 280 281def CheckPartitionSizes(inp): 282 if isinstance(inp, str): 283 info_dict = common.LoadDictionaryFromFile(inp) 284 return DynamicPartitionSizeChecker(info_dict).Run() 285 if isinstance(inp, dict): 286 return DynamicPartitionSizeChecker(inp).Run() 287 raise ValueError("{} is not a dictionary or a valid path".format(inp)) 288 289 290def main(argv): 291 args = common.ParseOptions(argv, __doc__) 292 if len(args) != 1: 293 common.Usage(__doc__) 294 sys.exit(1) 295 common.InitLogging() 296 CheckPartitionSizes(args[0]) 297 298 299if __name__ == "__main__": 300 try: 301 common.CloseInheritedPipes() 302 main(sys.argv[1:]) 303 except common.ExternalError: 304 logger.exception("\n ERROR:\n") 305 sys.exit(1) 306 finally: 307 common.Cleanup() 308