1#!/usr/bin/env vpython3 2# 3# Copyright 2021 The ChromiumOS Authors 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import re 8import os 9import subprocess 10 11from google.protobuf import json_format 12from google.protobuf.descriptor import FieldDescriptor 13 14# Move to the script's directory for running commands and importing proto 15# bindings. 16config_dir = subprocess.run( 17 ['git', 'rev-parse', '--show-toplevel'], 18 check=True, 19 capture_output=True, 20).stdout.decode('utf-8').strip() 21os.chdir(config_dir) 22 23import sys 24 25sys.path.append(os.path.join(config_dir, 'payload_utils')) 26sys.path.append(os.path.join(config_dir, 'python')) 27 28from chromiumos.config.payload import flat_config_pb2 29from chromiumos.config.api import component_pb2 30from chromiumos.test.api import dut_attribute_pb2 31 32from common import proto_utils 33 34SKIP_FOOTER = 'Skip-Dut-Attrs-Check' 35 36 37class DutAttributeChangeError(Exception): 38 """Raised when a commit makes an invalid change to DutAttributes.""" 39 40 41def attributes_at_ref(ref: str, 42 id_only=False) -> dut_attribute_pb2.DutAttributeList: 43 """Returns dut_attributes.jsonproto at a particular git ref. 44 45 Args: 46 ref - Git ref to read the DutAttributeList proto from. 47 id_only - If true, return a set of unique DutAttribute.Id values. 48 """ 49 50 proc = subprocess.run( 51 ['git', 'cat-file', 'blob', f'{ref}:generated/dut_attributes.jsonproto'], 52 check=True, 53 capture_output=True, 54 ) 55 56 # parse into DutAttributeList 57 dut_attr_list = dut_attribute_pb2.DutAttributeList() 58 json_format.Parse(proc.stdout, dut_attr_list) 59 60 if id_only: 61 return set([da.id.value for da in dut_attr_list.dut_attributes]) 62 return dut_attr_list.dut_attributes 63 64 65def check_attributes_removed(): 66 """Enforce semantics that DutAttributes cannot be removed once committed.""" 67 68 git_footers_proc = subprocess.run( 69 ['git-footers', '--key', SKIP_FOOTER, 'HEAD'], 70 check=True, 71 capture_output=True, 72 ) 73 74 if git_footers_proc.stdout: 75 print(f'{SKIP_FOOTER} footer set, skipping DUT attributes check.') 76 return 77 78 try: 79 prev_dut_attrs = attributes_at_ref('HEAD~1', id_only=True) 80 except subprocess.CalledProcessError: 81 print("No current DutAttributes, skipping removal check.") 82 return 83 84 curr_dut_attrs = attributes_at_ref('HEAD', id_only=True) 85 86 removed_attr_ids = prev_dut_attrs - curr_dut_attrs 87 if removed_attr_ids: 88 print( 89 "DutAttributes cannot be removed because branched code may depend on them.\n\n" 90 "To override this dcheck, add a footer to your commit:\n" 91 f" '{SKIP_FOOTER}: <explanation>'\n\n" 92 "Removed attributes:\n" + 93 "\n".join(f" {attr_id}" for attr_id in removed_attr_ids)) 94 sys.exit(1) 95 96 print('no DutAttributes removed.') 97 98 99# Define the message we're rooted in for each config source 100ROOT_MSG_MAP = { 101 'flat_config_source': flat_config_pb2.FlatConfig, 102 'hwid_source': component_pb2.Component, 103} 104 105 106def check_valid_attributes(): 107 """Check that attributes are valid.""" 108 109 NON_VALUE_TYPES = [ 110 FieldDescriptor.TYPE_MESSAGE, 111 FieldDescriptor.TYPE_GROUP, 112 FieldDescriptor.TYPE_BYTES, 113 ] 114 115 FLOAT_TYPES = [ 116 FieldDescriptor.TYPE_DOUBLE, 117 FieldDescriptor.TYPE_FLOAT, 118 ] 119 120 def format_error(attr, msg): 121 """Format an error message for the given attribute.""" 122 return "error in id '{attr}': {msg}".format(attr=attr.id.value, msg=msg) 123 124 def validate_attr_fields(attr): 125 """Validate the field specs of an attribute, return list of errors.""" 126 # Find oneof field (if any) to check 127 fields = [] 128 name = attr.WhichOneof("data_source") 129 if name: 130 if name == "tle_source": 131 print("Skipping '{}', no fields are defined.".format(name)) 132 return [] 133 fields = getattr(attr, name).fields 134 135 root_msg = ROOT_MSG_MAP.get(name) 136 if not root_msg: 137 print("Skipping '{}', no root message defined.".format(name)) 138 return [] 139 140 errors = [] 141 for field_spec in fields: 142 field_infos = proto_utils.resolve_field_path(root_msg, field_spec.path) 143 144 fields = field_spec.path.split('.') 145 for ii, (field, info) in enumerate(zip(fields, field_infos)): 146 # Check for non-existent fields 147 if not info: 148 msg_name = root_msg.DESCRIPTOR.name 149 if ii > 0: 150 msg_name = field_infos[ii - 1].typename 151 152 errors.append( 153 format_error( 154 attr, 155 "'{}' -- field '{}' does not exist in message '{}'".format( 156 field_spec.path, field, msg_name))) 157 break 158 159 # Check for repeated fields 160 if info.repeated: 161 errors.append( 162 format_error( 163 attr, 164 "'{}' -- field '{}' is repeated, must be singular".format( 165 field_spec.path, field))) 166 167 # Check that the terminal field is a value type 168 terminal_info = field_infos[-1] 169 if terminal_info: 170 if terminal_info.typeid in NON_VALUE_TYPES: 171 errors.append( 172 format_error( 173 attr, "'{}' -- last field '{}' must be value type".format( 174 field_spec.path, terminal_info.name))) 175 176 # and that it's not a floating point value 177 if terminal_info.typeid in FLOAT_TYPES: 178 errors.append( 179 format_error( 180 attr, "'{}' -- floating point fields are not allowed".format( 181 field_spec.path))) 182 183 return errors 184 185 # Check dut attribute ids and build a map of name => attribute id 186 def validate_attr(attr): 187 """Validate the properties of an attribute itself, return list of errors.""" 188 attr_map = {} 189 190 def maybe_add_label(attr, label): 191 """Record a label for an attribute if it doesn't already exist.""" 192 if label in attr_map: 193 return [ 194 format_error( 195 attr, "label '{}' already exists in attribute '{}'".format( 196 label, attr_map[label])) 197 ] 198 attr_map[label] = attr.id.value 199 return [] 200 201 # Check dut attributes names are formatted properly and not repeated 202 errors = [] 203 if not re.match("^([a-zA-Z0-9]+-?)+[a-zA-Z0-9]+$", attr.id.value): 204 errors.append( 205 format_error(attr, "has invalid id '{}'".format(attr.id.value))) 206 207 errors += maybe_add_label(attr, attr.id.value) 208 for label in attr.aliases: 209 errors += maybe_add_label(attr, label) 210 return errors 211 212 curr_dut_attrs = attributes_at_ref('HEAD') 213 214 # Multiple loops to preserve error ordering 215 errors = [] 216 for attr in curr_dut_attrs: 217 errors += validate_attr(attr) 218 219 for attr in curr_dut_attrs: 220 errors += validate_attr_fields(attr) 221 222 if not errors: 223 return 224 225 print("One or more errors checking DutAttributes:") 226 for error in errors: 227 print(" {}".format(error)) 228 sys.exit(1) 229 230 231def main(): 232 stages = [ 233 ('Check for removed attributes', check_attributes_removed), 234 ('Validate attribute definitions', check_valid_attributes), 235 ] 236 237 for msg, stage in stages: 238 print(f'== {msg}') 239 stage() 240 print() 241 242 243if __name__ == '__main__': 244 main() 245