1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import re 21import stat 22import json 23import time 24import platform 25import subprocess 26import signal 27from threading import Timer 28 29from _core.utils import get_file_absolute_path 30from _core.logger import platform_logger 31from _core.exception import ParamError 32from _core.constants import DeviceTestType 33from _core.constants import FilePermission 34from _core.constants import DeviceConnectorType 35 36LOG = platform_logger("Kit") 37 38TARGET_SDK_VERSION = 22 39 40__all__ = ["get_app_name_by_tool", "junit_para_parse", "gtest_para_parse", 41 "get_install_args", "reset_junit_para", "remount", "disable_keyguard", 42 "timeout_callback", "unlock_screen", "unlock_device", "get_class"] 43 44 45def remount(device): 46 device.enable_hdc_root() 47 cmd = "target mount" \ 48 if device.usb_type == DeviceConnectorType.hdc else "remount" 49 device.connector_command(cmd) 50 device.execute_shell_command("remount") 51 device.execute_shell_command("mount -o rw,remount /cust") 52 device.execute_shell_command("mount -o rw,remount /product") 53 device.execute_shell_command("mount -o rw,remount /hw_product") 54 device.execute_shell_command("mount -o rw,remount /version") 55 device.execute_shell_command("mount -o rw,remount /%s" % "system") 56 57 58def get_class(junit_paras, prefix_char, para_name): 59 if not junit_paras.get(para_name): 60 return "" 61 62 result = "" 63 if prefix_char == "-e": 64 result = " %s class " % prefix_char 65 elif prefix_char == "--": 66 result = " %sclass " % prefix_char 67 elif prefix_char == "-s": 68 result = " %s class " % prefix_char 69 test_items = [] 70 for test in junit_paras.get(para_name): 71 test_item = test.split("#") 72 if len(test_item) == 1 or len(test_item) == 2: 73 test_item = "%s" % test 74 test_items.append(test_item) 75 elif len(test_item) == 3: 76 test_item = "%s#%s" % (test_item[1], test_item[2]) 77 test_items.append(test_item) 78 else: 79 raise ParamError("The parameter %s %s is error" % ( 80 prefix_char, para_name)) 81 if not result: 82 LOG.debug("There is unsolved prefix char: %s ." % prefix_char) 83 return result + ",".join(test_items) 84 85 86def junit_para_parse(device, junit_paras, prefix_char="-e"): 87 """To parse the para of junit 88 Args: 89 device: the device running 90 junit_paras: the para dict of junit 91 prefix_char: the prefix char of parsed cmd 92 Returns: 93 the new para using in a command like -e testFile xxx 94 -e coverage true... 95 """ 96 ret_str = [] 97 path = "/%s/%s/%s" % ("data", "local", "ajur") 98 include_file = "%s/%s" % (path, "includes.txt") 99 exclude_file = "%s/%s" % (path, "excludes.txt") 100 101 if not isinstance(junit_paras, dict): 102 LOG.warning("The para of junit is not the dict format as required") 103 return "" 104 # Disable screen keyguard 105 disable_key_guard = junit_paras.get('disable-keyguard') 106 if not disable_key_guard or disable_key_guard[0].lower() != 'false': 107 disable_keyguard(device) 108 109 for para_name in junit_paras.keys(): 110 path = "/%s/%s/%s/" % ("data", "local", "ajur") 111 if para_name.strip() == 'test-file-include-filter': 112 for file_name in junit_paras[para_name]: 113 device.push_file(file_name, include_file) 114 device.execute_shell_command( 115 'chown -R shell:shell %s' % path) 116 ret_str.append(" ".join([prefix_char, 'testFile', include_file])) 117 elif para_name.strip() == "test-file-exclude-filter": 118 for file_name in junit_paras[para_name]: 119 device.push_file(file_name, exclude_file) 120 device.execute_shell_command( 121 'chown -R shell:shell %s' % path) 122 ret_str.append(" ".join([prefix_char, 'notTestFile', 123 exclude_file])) 124 elif para_name.strip() == "test" or para_name.strip() == "class": 125 result = get_class(junit_paras, prefix_char, para_name.strip()) 126 ret_str.append(result) 127 elif para_name.strip() == "include-annotation": 128 ret_str.append(" ".join([prefix_char, "annotation", 129 ",".join(junit_paras[para_name])])) 130 elif para_name.strip() == "exclude-annotation": 131 ret_str.append(" ".join([prefix_char, "notAnnotation", 132 ",".join(junit_paras[para_name])])) 133 else: 134 ret_str.append(" ".join([prefix_char, para_name, 135 ",".join(junit_paras[para_name])])) 136 137 return " ".join(ret_str) 138 139 140def get_include_tests(para_datas, test_types, runner): 141 case_list = [] 142 if test_types == "class": 143 case_list = para_datas 144 else: 145 for case_file in para_datas: 146 flags = os.O_RDONLY 147 modes = stat.S_IWUSR | stat.S_IRUSR 148 with os.fdopen(os.open(case_file, flags, modes), "r") as file_desc: 149 case_list.extend(file_desc.read().splitlines()) 150 runner.add_instrumentation_arg("gtest_filter", ":".join(case_list).replace("#", ".")) 151 152 153def get_all_test_include(para_datas, test_types, runner, request): 154 case_list = [] 155 if test_types == "notClass": 156 case_list = para_datas 157 else: 158 if para_datas: 159 flags = os.O_RDONLY 160 modes = stat.S_IWUSR | stat.S_IRUSR 161 with os.fdopen(os.open(para_datas[0], flags, modes), "r") as file_handler: 162 json_data = json.load(file_handler) 163 exclude_list = json_data.get(DeviceTestType.cpp_test, []) 164 for exclude in exclude_list: 165 if request.get_module_name() in exclude: 166 temp = exclude.get(request.get_module_name()) 167 case_list.extend(temp) 168 runner.add_instrumentation_arg("gtest_filter", "{}{}".format("-", ":".join(case_list)).replace("#", ".")) 169 170 171def gtest_para_parse(gtest_paras, runner, request): 172 """To parse the para of gtest 173 Args: 174 gtest_paras: the para dict of gtest 175 Returns: 176 the new para using in gtest 177 """ 178 if not isinstance(gtest_paras, dict): 179 LOG.warning("The para of gtest is not the dict format as required") 180 return "" 181 182 for para in gtest_paras.keys(): 183 test_types = para.strip() 184 para_datas = gtest_paras.get(para) 185 if test_types in ["test-file-include-filter", "class"]: 186 get_include_tests(para_datas, test_types, runner) 187 elif test_types in ["all-test-file-exclude-filter", "notClass"]: 188 get_all_test_include(para_datas, test_types, runner, request) 189 return "" 190 191 192def reset_junit_para(junit_para_str, prefix_char="-e", ignore_keys=None): 193 if not ignore_keys and not isinstance(ignore_keys, list): 194 ignore_keys = ["class", "test"] 195 lines = junit_para_str.split("%s " % prefix_char) 196 normal_lines = [] 197 for line in lines: 198 line = line.strip() 199 if line: 200 items = line.split() 201 if items[0].strip() in ignore_keys: 202 continue 203 normal_lines.append("{} {}".format(prefix_char, line)) 204 return " ".join(normal_lines) 205 206 207def get_install_args(device, app_name, original_args=None): 208 """To obtain all the args of app install 209 Args: 210 original_args: the argus configure in .config file 211 device : the device will be installed app 212 app_name : the name of the app which will be installed 213 Returns: 214 All the args 215 """ 216 if original_args is None: 217 original_args = [] 218 new_args = original_args[:] 219 try: 220 sdk_version = device.get_property("ro.build.version.sdk") 221 if int(sdk_version) > TARGET_SDK_VERSION: 222 new_args.append("-g") 223 except TypeError as type_error: 224 LOG.error("Obtain the sdk version failed with exception {}".format( 225 type_error)) 226 except ValueError as value_error: 227 LOG.error("Obtain the sdk version failed with exception {}".format( 228 value_error)) 229 if app_name.endswith(".apex"): 230 new_args.append("--apex") 231 return " ".join(new_args) 232 233 234def get_app_name_by_tool(app_path, paths): 235 """To obtain the app name by using tool 236 Args: 237 app_path: the path of app 238 paths: 239 Returns: 240 The Pkg Name if found else None 241 """ 242 rex = "^package:\\s+name='(.*?)'.*$" 243 aapt_tool_name = "aapt.exe" if os.name == "nt" else "aapt" 244 if app_path: 245 proc_timer = None 246 try: 247 tool_file = get_file_absolute_path(os.path.join( 248 "tools", aapt_tool_name), paths) 249 LOG.debug("Aapt file is %s" % tool_file) 250 251 if platform.system() == "Linux": 252 if not oct(os.stat(tool_file).st_mode)[-3:] == "755": 253 os.chmod(tool_file, FilePermission.mode_755) 254 255 cmd = [tool_file, "dump", "badging", app_path] 256 timeout = 300 257 LOG.info("Execute command %s with %s" % (" ".join(cmd), timeout)) 258 259 sub_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, 260 stderr=subprocess.PIPE) 261 proc_timer = Timer(timeout, timeout_callback, [sub_process]) 262 proc_timer.start() 263 # The package name must be return in first line 264 output = sub_process.stdout.readline() 265 error = sub_process.stderr.readline() 266 LOG.debug("The output of aapt is {}".format(output)) 267 if error: 268 LOG.debug("The error of aapt is {}".format(error)) 269 if output: 270 pkg_match = re.match(rex, output.decode("utf8", 'ignore')) 271 if pkg_match is not None: 272 LOG.info( 273 "Obtain the app name {} successfully by using " 274 "aapt".format(pkg_match.group(1))) 275 return pkg_match.group(1) 276 return None 277 except (FileNotFoundError, ParamError) as error: 278 LOG.debug("Aapt error: %s", error.args) 279 return None 280 finally: 281 if proc_timer: 282 proc_timer.cancel() 283 else: 284 LOG.error("get_app_name_by_tool error.") 285 return None 286 287 288def timeout_callback(proc): 289 try: 290 LOG.error("Error: execute command timeout.") 291 LOG.error(proc.pid) 292 if platform.system() != "Windows": 293 os.killpg(proc.pid, signal.SIGKILL) 294 else: 295 subprocess.call( 296 ["C:\\Windows\\System32\\taskkill", "/F", "/T", "/PID", 297 str(proc.pid)], shell=False) 298 except (FileNotFoundError, KeyboardInterrupt, AttributeError) as error: 299 LOG.exception("Timeout callback exception: %s" % error, exc_info=False) 300 301 302def disable_keyguard(device): 303 unlock_screen(device) 304 unlock_device(device) 305 306 307def unlock_screen(device): 308 device.execute_shell_command("svc power stayon true") 309 time.sleep(1) 310 311 312def unlock_device(device): 313 device.execute_shell_command("input keyevent 82") 314 time.sleep(1) 315 device.execute_shell_command("wm dismiss-keyguard") 316 time.sleep(1) 317