1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import re 21import stat 22import json 23import time 24import platform 25import subprocess 26import signal 27from threading import Timer 28 29from _core.constants import DeviceTestType 30from _core.constants import FilePermission 31from _core.constants import DeviceConnectorType 32from _core.error import ErrorMessage 33from _core.exception import ParamError 34from _core.logger import platform_logger 35from _core.utils import get_file_absolute_path 36 37LOG = platform_logger("Kit") 38 39TARGET_SDK_VERSION = 22 40 41__all__ = ["get_app_name_by_tool", "junit_para_parse", "gtest_para_parse", 42 "get_install_args", "reset_junit_para", "remount", "disable_keyguard", 43 "timeout_callback", "unlock_screen", "unlock_device", "get_class", 44 "check_device_ohca"] 45 46 47def remount(device): 48 cmd = "target mount" \ 49 if device.usb_type == DeviceConnectorType.hdc else "remount" 50 device.connector_command(cmd) 51 device.execute_shell_command("remount") 52 device.execute_shell_command("mount -o rw,remount /") 53 device.execute_shell_command("mount -o rw,remount /sys_prod") 54 device.execute_shell_command("mount -o rw,remount /chip_prod") 55 device.execute_shell_command("mount -o rw,remount /preload") 56 device.execute_shell_command("mount -o rw,remount /patch_hw") 57 device.execute_shell_command("mount -o rw,remount /vendor") 58 device.execute_shell_command("mount -o rw,remount /cust") 59 device.execute_shell_command("mount -o rw,remount /product") 60 device.execute_shell_command("mount -o rw,remount /hw_product") 61 device.execute_shell_command("mount -o rw,remount /version") 62 device.execute_shell_command("mount -o rw,remount /system") 63 device.connector_command("target mount") 64 65 66def get_class(junit_paras, prefix_char, para_name): 67 if not junit_paras.get(para_name): 68 return "" 69 70 result = "" 71 if prefix_char == "-e": 72 result = " {} class ".format(prefix_char) 73 elif prefix_char == "--": 74 result = " {} class ".format(prefix_char) 75 elif prefix_char == "-s": 76 result = " {} class ".format(prefix_char) 77 test_items = [] 78 for test in junit_paras.get(para_name): 79 test_item = test.split("#") 80 if len(test_item) == 1 or len(test_item) == 2: 81 test_item = "{}".format(test) 82 test_items.append(test_item) 83 elif len(test_item) == 3: 84 test_item = "{}#{}".format(test_item[1], test_item[2]) 85 test_items.append(test_item) 86 else: 87 raise ParamError(ErrorMessage.Common.Code_0101031.format(prefix_char, para_name)) 88 if not result: 89 LOG.debug("There is unsolved prefix char: {} .".format(prefix_char)) 90 return result + ",".join(test_items) 91 92 93def junit_para_parse(device, junit_paras, prefix_char="-e"): 94 """To parse the para of junit 95 Args: 96 device: the device running 97 junit_paras: the para dict of junit 98 prefix_char: the prefix char of parsed cmd 99 Returns: 100 the new para using in a command like -e testFile xxx 101 -e coverage true... 102 """ 103 ret_str = [] 104 path = "/{}/{}/{}".format("data", "local", "ajur") 105 include_file = "{}/{}".format(path, "includes.txt") 106 exclude_file = "{}/{}".format(path, "excludes.txt") 107 108 if not isinstance(junit_paras, dict): 109 LOG.warning("The para of junit is not the dict format as required") 110 return "" 111 # Disable screen keyguard 112 disable_key_guard = junit_paras.get('disable-keyguard') 113 if not disable_key_guard or disable_key_guard[0].lower() != 'false': 114 disable_keyguard(device) 115 116 for key, value in junit_paras.items(): 117 # value is a list object 118 para_name = key.strip() 119 path = "/{}/{}/{}/".format("data", "local", "ajur") 120 if para_name == "test-file-include-filter": 121 for file_name in value: 122 device.push_file(file_name, include_file) 123 device.execute_shell_command( 124 'chown -R shell:shell {}'.format(path)) 125 ret_str.append(" ".join([prefix_char, "testFile", include_file])) 126 elif para_name == "test-file-exclude-filter": 127 for file_name in value: 128 device.push_file(file_name, exclude_file) 129 device.execute_shell_command( 130 'chown -R shell:shell {}'.format(path)) 131 ret_str.append(" ".join([prefix_char, "notTestFile", exclude_file])) 132 elif para_name == "test" or para_name == "class": 133 result = get_class(junit_paras, prefix_char, para_name) 134 ret_str.append(result) 135 elif para_name == "include-annotation": 136 ret_str.append(" ".join([prefix_char, "annotation", ",".join(value)])) 137 elif para_name == "exclude-annotation": 138 ret_str.append(" ".join([prefix_char, "notAnnotation", ",".join(value)])) 139 140 return " ".join(ret_str) 141 142 143def get_include_tests(para_datas, test_types, runner): 144 case_list = [] 145 if test_types == "class": 146 case_list = para_datas 147 else: 148 for case_file in para_datas: 149 flags = os.O_RDONLY 150 modes = stat.S_IWUSR | stat.S_IRUSR 151 with os.fdopen(os.open(case_file, flags, modes), "r") as file_desc: 152 case_list.extend(file_desc.read().splitlines()) 153 runner.add_instrumentation_arg("gtest_filter", ":".join(case_list).replace("#", ".")) 154 155 156def get_all_test_include(para_datas, test_types, runner, request): 157 case_list = [] 158 if test_types == "notClass": 159 case_list = para_datas 160 else: 161 if para_datas: 162 flags = os.O_RDONLY 163 modes = stat.S_IWUSR | stat.S_IRUSR 164 with os.fdopen(os.open(para_datas[0], flags, modes), "r") as file_handler: 165 json_data = json.load(file_handler) 166 exclude_list = json_data.get(DeviceTestType.cpp_test, []) 167 for exclude in exclude_list: 168 if request.get_module_name() in exclude: 169 temp = exclude.get(request.get_module_name()) 170 case_list.extend(temp) 171 runner.add_instrumentation_arg("gtest_filter", "{}{}".format("-", ":".join(case_list)).replace("#", ".")) 172 173 174def gtest_para_parse(gtest_paras, runner, request): 175 """To parse the para of gtest 176 Args: 177 gtest_paras: the para dict of gtest 178 Returns: 179 the new para using in gtest 180 """ 181 if not isinstance(gtest_paras, dict): 182 LOG.warning("The para of gtest is not the dict format as required") 183 return "" 184 for para in gtest_paras.keys(): 185 test_types = para.strip() 186 para_datas = gtest_paras.get(para) 187 if test_types in ["test-file-include-filter", "class"]: 188 get_include_tests(para_datas, test_types, runner) 189 elif test_types in ["all-test-file-exclude-filter", "notClass"]: 190 get_all_test_include(para_datas, test_types, runner, request) 191 return "" 192 193 194def reset_junit_para(junit_para_str, prefix_char="-e", ignore_keys=None): 195 if not ignore_keys and not isinstance(ignore_keys, list): 196 ignore_keys = ["class", "test"] 197 lines = junit_para_str.split("{} ".format(prefix_char)) 198 normal_lines = [] 199 for line in lines: 200 line = line.strip() 201 if line: 202 items = line.split() 203 if items[0].strip() in ignore_keys: 204 continue 205 normal_lines.append("{} {}".format(prefix_char, line)) 206 return " ".join(normal_lines) 207 208 209def get_install_args(device, app_name, original_args=None): 210 """To obtain all the args of app install 211 Args: 212 original_args: the argus configure in .config file 213 device : the device will be installed app 214 app_name : the name of the app which will be installed 215 Returns: 216 All the args 217 """ 218 if original_args is None: 219 original_args = [] 220 new_args = original_args[:] 221 try: 222 sdk_version = device.get_property("ro.build.version.sdk") 223 if int(sdk_version) > TARGET_SDK_VERSION: 224 new_args.append("-g") 225 except TypeError as type_error: 226 LOG.error("Obtain the sdk version failed with exception {}".format( 227 type_error)) 228 except ValueError as value_error: 229 LOG.error("Obtain the sdk version failed with exception {}".format( 230 value_error)) 231 if app_name.endswith(".apex"): 232 new_args.append("--apex") 233 return " ".join(new_args) 234 235 236def get_app_name_by_tool(app_path, paths): 237 """To obtain the app name by using tool 238 Args: 239 app_path: the path of app 240 paths: 241 Returns: 242 The Pkg Name if found else None 243 """ 244 rex = "^package:\\s+name='(.*?)'.*$" 245 if platform.system() == "Windows": 246 aapt_tool_name = "aapt.exe" 247 elif platform.system() == "Linux": 248 aapt_tool_name = "aapt" 249 else: 250 aapt_tool_name = "aapt_mac" 251 if app_path: 252 proc_timer = None 253 try: 254 tool_file = get_file_absolute_path(aapt_tool_name, paths) 255 LOG.debug("Aapt file is {}".format(tool_file)) 256 257 if platform.system() == "Linux" or platform.system() == "Darwin": 258 if not oct(os.stat(tool_file).st_mode)[-3:] == "755": 259 os.chmod(tool_file, FilePermission.mode_755) 260 261 cmd = [tool_file, "dump", "badging", app_path] 262 timeout = 300 263 LOG.info("Execute command {} with {}".format(" ".join(cmd), timeout)) 264 265 sub_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, 266 stderr=subprocess.PIPE) 267 proc_timer = Timer(timeout, timeout_callback, [sub_process]) 268 proc_timer.start() 269 # The package name must be return in first line 270 output = sub_process.stdout.readline() 271 error = sub_process.stderr.readline() 272 LOG.debug("The output of aapt is {}".format(output)) 273 if error: 274 LOG.debug("The error of aapt is {}".format(error)) 275 if output: 276 pkg_match = re.match(rex, output.decode("utf8", 'ignore')) 277 if pkg_match is not None: 278 LOG.info( 279 "Obtain the app name {} successfully by using " 280 "aapt".format(pkg_match.group(1))) 281 return pkg_match.group(1) 282 return None 283 except (FileNotFoundError, ParamError) as error: 284 LOG.debug("Aapt error: {}".format(error.args)) 285 return None 286 finally: 287 if proc_timer: 288 proc_timer.cancel() 289 else: 290 LOG.error("get_app_name_by_tool error.") 291 return None 292 293 294def timeout_callback(proc): 295 try: 296 LOG.error("Error: execute command timeout.") 297 LOG.error(proc.pid) 298 if platform.system() != "Windows": 299 os.killpg(proc.pid, signal.SIGKILL) 300 else: 301 subprocess.call( 302 ["C:\\Windows\\System32\\taskkill", "/F", "/T", "/PID", 303 str(proc.pid)], shell=False) 304 except (FileNotFoundError, KeyboardInterrupt, AttributeError) as error: 305 LOG.exception("Timeout callback exception: {}".format(error, exc_info=False)) 306 307 308def disable_keyguard(device): 309 unlock_screen(device) 310 unlock_device(device) 311 312 313def unlock_screen(device): 314 device.execute_shell_command("svc power stayon true") 315 time.sleep(1) 316 317 318def unlock_device(device): 319 device.execute_shell_command("input keyevent 82") 320 time.sleep(1) 321 device.execute_shell_command("wm dismiss-keyguard") 322 time.sleep(1) 323 324 325def check_device_ohca(device): 326 return False 327