1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# Copyright (c) 2021 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import sys 17import os 18import argparse 19import shutil 20import xml.etree.ElementTree as ET 21 22sys.path.append( 23 os.path.dirname(os.path.dirname(os.path.dirname( 24 os.path.abspath(__file__))))) 25from scripts.util.file_utils import read_json_file, write_json_file # noqa: E402 26from scripts.util import build_utils # noqa: E402 27 28 29def copy_dir(src: str, dest: str) -> list: 30 if not os.path.exists(src): 31 raise Exception("src dir '{}' doesn't exist.".format(src)) 32 if not os.path.exists(dest): 33 os.makedirs(dest, exist_ok=True) 34 result_files = [] 35 src_files = [] 36 for root, _, files in os.walk(src): 37 for _file in files: 38 file_path = os.path.join(root, _file) 39 src_files.append(file_path) 40 for src_path in src_files: 41 if os.path.islink(src_path): 42 continue 43 file_relpath = os.path.relpath(src_path, src) 44 dest_path = os.path.join(dest, file_relpath) 45 dest_dir = os.path.dirname(dest_path) 46 if not os.path.exists(dest_dir): 47 os.makedirs(dest_dir, exist_ok=True) 48 shutil.copy2(src_path, dest_path) 49 result_files.append(src_path) 50 result_files.append(dest_path) 51 return result_files 52 53 54def _resources_with_xml_v1(root, testcase_target_name: str, test_resource_path: str, 55 part_build_out_path: str, resource_output_path: str) -> list: 56 _out_resources_list = [] 57 for target in root: 58 if target.attrib.get('name') != testcase_target_name: 59 continue 60 for _depend in target: 61 _findpath = _depend.attrib.get('findpath') 62 _resource_file = _depend.attrib.get('resource') 63 if _findpath == 'res': 64 _resource_src = os.path.join(test_resource_path, 65 _resource_file) 66 _res_dest = os.path.join(resource_output_path, _resource_file) 67 elif _findpath == 'out': 68 if not os.path.exists(_resource_file): 69 __dir_name = _resource_file.split('/')[0] 70 _resource_file_new = os.path.join(__dir_name, 71 _resource_file) 72 _resource_src_new = os.path.join(part_build_out_path, 73 _resource_file_new) 74 if os.path.exists(_resource_src_new): 75 _resource_src = _resource_src_new 76 _res_dest = os.path.join(resource_output_path, 77 _resource_file) 78 else: 79 _resource_src = '' 80 _res_dest = '' 81 else: 82 _resource_src = os.path.join(part_build_out_path, 83 _resource_file) 84 _res_dest = os.path.join(resource_output_path, 85 _resource_file) 86 else: 87 raise Exception( 88 "resource findpath type '{}' not support.".format( 89 _findpath)) 90 if _resource_src: 91 _out_resources_list.append({ 92 "src": 93 os.path.relpath(_resource_src), 94 "dest": 95 os.path.relpath(_res_dest) 96 }) 97 return _out_resources_list 98 99 100def _parse_res_value(value) -> str: 101 res_file = value.split('->')[0].strip() 102 return res_file 103 104 105def _resources_with_xml_v2(root, testcase_target_name: str, test_resource_path: str, 106 part_build_out_path: str, resource_output_path: str) -> list: 107 _out_resources_list = [] 108 for target in root: 109 if target.attrib.get('name') != testcase_target_name: 110 continue 111 for child in target: 112 if child.tag != 'preparer': 113 continue 114 for _option in child: 115 if _option.attrib.get('name') != 'push': 116 continue 117 _src_type = _option.attrib.get('src') 118 _resource_file_val = _option.attrib.get('value') 119 _resource_file = _parse_res_value(_resource_file_val) 120 if _src_type == 'res': 121 _resource_src = os.path.join(test_resource_path, 122 _resource_file) 123 _res_dest = os.path.join(resource_output_path, 124 _resource_file) 125 elif _src_type == 'out': 126 _resource_src = os.path.join(part_build_out_path, 127 _resource_file) 128 _res_dest = os.path.join(resource_output_path, 129 _resource_file) 130 else: 131 raise Exception( 132 "resource src type '{}' not support.".format( 133 _src_type)) 134 if _resource_src: 135 _out_resources_list.append({ 136 "src": 137 os.path.relpath(_resource_src), 138 "dest": 139 os.path.relpath(_res_dest) 140 }) 141 return _out_resources_list 142 143 144def find_testcase_resources(resource_config_file: str, testcase_target_name: str, 145 test_resource_path: str, part_build_out_path: str, 146 resource_output_path: str) -> list: 147 if not os.path.exists(resource_config_file): 148 return [] 149 tree = ET.parse(resource_config_file) 150 root = tree.getroot() 151 if root.attrib.get('ver') == '2.0': 152 _resources_list = _resources_with_xml_v2(root, testcase_target_name, 153 test_resource_path, 154 part_build_out_path, 155 resource_output_path) 156 else: 157 _resources_list = _resources_with_xml_v1(root, testcase_target_name, 158 test_resource_path, 159 part_build_out_path, 160 resource_output_path) 161 # copy ohos_test.xml 162 _resources_list.append({ 163 "src": 164 resource_config_file, 165 "dest": 166 os.path.join(resource_output_path, 167 os.path.basename(resource_config_file)) 168 }) 169 return _resources_list 170 171 172def copy_testcase_resources(resource_infos: list) -> list: 173 result_dest_list = [] 174 for resource_info in resource_infos: 175 src_file = resource_info.get('src') 176 if not os.path.exists(src_file): 177 print("warning: testcase resource {} doesn't exist.".format( 178 src_file)) 179 return result_dest_list 180 dest_file = resource_info.get('dest') 181 dest_dir = os.path.dirname(dest_file) 182 if os.path.isdir(src_file): 183 result_files = copy_dir(src_file, dest_file) 184 result_dest_list.extend(result_files) 185 else: 186 if not os.path.exists(dest_dir): 187 os.makedirs(dest_dir, exist_ok=True) 188 shutil.copy2(src_file, dest_file) 189 if src_file: 190 result_dest_list.append(src_file) 191 result_dest_list.append(dest_file) 192 return result_dest_list 193 194 195def _get_subsystem_name(part_name: str): 196 subsystem_parts_file = 'build_configs/parts_info/components.json' 197 subsystem_parts_info = read_json_file(subsystem_parts_file) 198 if subsystem_parts_info is None: 199 raise Exception("read file '{}' failed.".format(subsystem_parts_file)) 200 for _part_name, p_dict in subsystem_parts_info.items(): 201 if part_name == _part_name: 202 return p_dict.get("subsystem") 203 return None 204 205 206def find_project_root(start_path: str, marker: str = '.gn') -> str: 207 """ 208 :param start_path: 起始路径 209 :param marker: 标记文件名, 默认使用 .gn 210 :return: 项目根目录路径 211 """ 212 current_path = os.path.abspath(start_path) 213 while True: 214 if os.path.isfile(os.path.join(current_path, marker)): 215 return current_path 216 parent_path = os.path.dirname(current_path) 217 if current_path == parent_path: # 达到文件系统的根目录 218 raise Exception(f"Cant find the root of the project containing '{marker}'.") 219 current_path = parent_path 220 221 222def _get_subsystem_path(part_name: str) -> str: 223 subsystem_name = _get_subsystem_name(part_name) 224 if subsystem_name is None: 225 return None 226 project_root = find_project_root(__file__) 227 subsystem_build_config_file = os.path.join(project_root, 'build', 'subsystem_config.json') 228 config_info = read_json_file(subsystem_build_config_file) 229 if config_info is None: 230 raise Exception( 231 "read file '{}' failed.".format(subsystem_build_config_file)) 232 info = config_info.get(subsystem_name) 233 if info is None: 234 raise Exception( 235 "subsystem '{}' info doesn't exist.".format(subsystem_name)) 236 subsystem_paths = info.get('path') 237 return subsystem_paths 238 239 240def _parse_module_out_path(module_out_path: str): 241 split_re = module_out_path.split('/', 1) 242 part_name = split_re[0] 243 module_name = split_re[1] 244 return part_name, module_name 245 246 247def _find_resource_config_file(config_file_name: str, subsystem_path: str, module_name: str) -> str: 248 resource_config_file = os.path.join('../../', subsystem_path, 249 'test/resource', module_name, 250 config_file_name) 251 # compatibility 252 if not os.path.exists(resource_config_file): 253 module_dirs = module_name.split('/') 254 _dirs_num = len(module_dirs) 255 _dir_name = os.path.dirname(resource_config_file) 256 while _dirs_num > 1: 257 _dir_name = os.path.dirname(_dir_name) 258 resource_config_file = os.path.join(_dir_name, config_file_name) 259 if os.path.exists(resource_config_file): 260 break 261 _dirs_num -= 1 262 return resource_config_file 263 264 265def _get_res_config_file(module_out_path: str) -> str: 266 part_name, module_name = _parse_module_out_path(module_out_path) 267 subsystem_paths = _get_subsystem_path(part_name) 268 resource_config_files = [] 269 if not subsystem_paths: 270 return resource_config_files 271 for _path in subsystem_paths: 272 resource_config_file = _find_resource_config_file( 273 'ohos_test.xml', _path, module_name) 274 if not os.path.exists(resource_config_file): 275 resource_config_file = _find_resource_config_file( 276 'harmony_test.xml', _path, module_name) 277 resource_config_files.append(resource_config_file) 278 return resource_config_files 279 280 281def _get_resources_list(resource_config_file: str, testcase_target_name: str, 282 part_build_out_path: str, resource_output_path: str) -> list: 283 if not os.path.exists(resource_config_file): 284 raise Exception( 285 "testcase '{}' resource_config_file config incorrect.".format( 286 testcase_target_name)) 287 test_resource_path = os.path.dirname(resource_config_file) 288 resources_list = find_testcase_resources(resource_config_file, 289 testcase_target_name, 290 test_resource_path, 291 part_build_out_path, 292 resource_output_path) 293 return resources_list 294 295 296def _get_resources_list_auto_match(module_out_path: str, testcase_target_name: str, 297 part_build_out_path: str, resource_output_path: str) -> list: 298 resource_config_files = _get_res_config_file(module_out_path) 299 all_resources_list = [] 300 for resource_config_file in resource_config_files: 301 if resource_config_file is None or not os.path.exists( 302 resource_config_file): 303 continue 304 test_resource_path = os.path.dirname(resource_config_file) 305 resources_list = find_testcase_resources(resource_config_file, 306 testcase_target_name, 307 test_resource_path, 308 part_build_out_path, 309 resource_output_path) 310 all_resources_list.extend(resources_list) 311 return all_resources_list 312 313 314def main() -> int: 315 parser = argparse.ArgumentParser() 316 parser.add_argument('--resource-config-file', required=False) 317 parser.add_argument('--testcase-target-name', required=True) 318 parser.add_argument('--part-build-out-path', required=True) 319 parser.add_argument('--resource-output-path', required=True) 320 parser.add_argument('--module-out-path', required=False) 321 parser.add_argument('--output-file', required=True) 322 parser.add_argument('--depfile', required=False) 323 args = parser.parse_args() 324 if not args.resource_config_file: 325 if not args.module_out_path: 326 raise Exception('Missing parameter module_out_path.') 327 resources_list = _get_resources_list_auto_match( 328 args.module_out_path, args.testcase_target_name, 329 args.part_build_out_path, args.resource_output_path) 330 else: 331 resources_list = _get_resources_list(args.resource_config_file, 332 args.testcase_target_name, 333 args.part_build_out_path, 334 args.resource_output_path) 335 if not resources_list: 336 return 0 337 write_json_file(args.output_file, resources_list) 338 result_dest_list = copy_testcase_resources(resources_list) 339 if args.depfile and result_dest_list: 340 result_dest_list.sort() 341 build_utils.write_depfile(args.depfile, 342 args.output_file, 343 result_dest_list, 344 add_pydeps=False) 345 return 0 346 347 348if __name__ == '__main__': 349 sys.exit(main()) 350