1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import re 21import subprocess 22import zipfile 23import stat 24import time 25import json 26from dataclasses import dataclass 27from tempfile import TemporaryDirectory 28from tempfile import NamedTemporaryFile 29 30from xdevice import ITestKit 31from xdevice import platform_logger 32from xdevice import Plugin 33from xdevice import ParamError 34from xdevice import get_file_absolute_path 35from xdevice import get_config_value 36from xdevice import exec_cmd 37from xdevice import ConfigConst 38from xdevice import AppInstallError 39from xdevice import convert_serial 40from xdevice import check_path_legal 41from xdevice import modify_props 42from xdevice import get_app_name_by_tool 43from xdevice import remount 44from xdevice import disable_keyguard 45from xdevice import get_class 46 47from ohos.constants import CKit 48from ohos.environment.dmlib import HdcHelper 49from ohos.environment.dmlib import CollectingOutputReceiver 50 51__all__ = ["STSKit", "CommandKit", "PushKit", "PropertyCheckKit", "ShellKit", "WifiKit", 52 "ConfigKit", "AppInstallKit", "ComponentKit", "PermissionKit", 53 "junit_dex_para_parse", "oh_jsunit_para_parse"] 54 55MAX_WAIT_COUNT = 4 56TARGET_SDK_VERSION = 22 57 58LOG = platform_logger("Kit") 59 60 61@Plugin(type=Plugin.TEST_KIT, id=CKit.command) 62class CommandKit(ITestKit): 63 64 def __init__(self): 65 self.run_command = [] 66 self.teardown_command = [] 67 self.paths = "" 68 69 def __check_config__(self, config): 70 self.paths = get_config_value('paths', config) 71 self.teardown_command = get_config_value('teardown', config) 72 self.run_command = get_config_value('shell', config) 73 74 def __setup__(self, device, **kwargs): 75 del kwargs 76 LOG.debug("CommandKit setup, device:{}, params:{}". 77 format(device, self.get_plugin_config().__dict__)) 78 if len(self.run_command) == 0: 79 LOG.info("No setup_command to run, skipping!") 80 return 81 for command in self.run_command: 82 self._run_command(command, device) 83 84 def __teardown__(self, device): 85 LOG.debug("CommandKit teardown: device:{}, params:{}".format 86 (device, self.get_plugin_config().__dict__)) 87 if len(self.teardown_command) == 0: 88 LOG.info("No teardown_command to run, skipping!") 89 return 90 for command in self.teardown_command: 91 self._run_command(command, device) 92 93 def _run_command(self, command, device): 94 95 command_type = command.get("name").strip() 96 command_value = command.get("value") 97 98 if command_type == "reboot": 99 device.reboot() 100 elif command_type == "install": 101 LOG.debug("Trying to install package {}".format(command_value)) 102 package = get_file_absolute_path(command_value, self.paths) 103 if not package or not os.path.exists(package): 104 LOG.error( 105 "The package {} to be installed does not exist".format( 106 package)) 107 108 result = device.install_package(package) 109 if not result.startswith("Success"): 110 raise AppInstallError( 111 "Failed to install %s on %s. Reason:%s" % 112 (package, device.__get_serial__(), result)) 113 LOG.debug("Installed package finished {}".format(package)) 114 elif command_type == "uninstall": 115 LOG.debug("Trying to uninstall package {}".format(command_value)) 116 package = get_file_absolute_path(command_value, self.paths) 117 app_name = get_app_name_by_tool(package, self.paths) 118 if app_name: 119 result = device.uninstall_package(app_name) 120 if not result.startswith("Success"): 121 LOG.error("error uninstalling package %s %s" % 122 (device.__get_serial__(), result)) 123 LOG.debug("uninstall package finished {}".format(app_name)) 124 elif command_type == "pull": 125 files = command_value.split("->") 126 remote = files[0].strip() 127 local = files[1].strip() 128 device.pull_file(remote, local) 129 elif command_type == "push": 130 files = command_value.split("->") 131 src = files[0].strip() 132 dst = files[1].strip() if files[1].strip().startswith("/") else \ 133 files[1].strip() + Props.dest_root 134 LOG.debug( 135 "Trying to push the file local {} to remote{}".format( 136 src, dst)) 137 real_src_path = get_file_absolute_path(src, self.paths) 138 if not real_src_path or not os.path.exists(real_src_path): 139 LOG.error( 140 "The src file {} to be pushed does not exist".format(src)) 141 device.push_file(real_src_path, dst) 142 LOG.debug("Push file finished from {} to {}".format(src, dst)) 143 elif command_type == "shell": 144 device.execute_shell_command(command_value) 145 146 147@Plugin(type=Plugin.TEST_KIT, id=CKit.sts) 148class STSKit(ITestKit): 149 def __init__(self): 150 self.sts_version = "" 151 self.throw_error = "" 152 153 def __check_config__(self, config): 154 self.sts_version = get_config_value('sts-version', config) 155 self.throw_error = get_config_value('throw-error', config) 156 if len(self.sts_version) < 1: 157 raise TypeError( 158 "The sts_version: {} is invalid".format(self.sts_version)) 159 160 def __setup__(self, device, **kwargs): 161 del kwargs 162 LOG.debug("STSKit setup, device:{}, params:{}". 163 format(device, self.get_plugin_config().__dict__)) 164 device_spl = device.get_property(Props.security_patch) 165 if device_spl is None or device_spl == "": 166 LOG.error("The device security {} is invalid".format(device_spl)) 167 raise ParamError( 168 "The device security patch version {} is invalid".format( 169 device_spl)) 170 rex = '^[a-zA-Z\\d\\.]+_([\\d]+-[\\d]+)$' 171 match = re.match(rex, self.sts_version) 172 if match is None: 173 LOG.error("The sts version {} does match the rule".format( 174 self.sts_version)) 175 raise ParamError("The sts version {} does match the rule".format( 176 self.sts_version)) 177 sts_version_date_user = match.group(1).join("-01") 178 sts_version_date_kernel = match.group(1).join("-05") 179 if device_spl in [sts_version_date_user, sts_version_date_kernel]: 180 LOG.info( 181 "The device SPL version {} match the sts version {}".format( 182 device_spl, self.sts_version)) 183 else: 184 err_msg = "The device SPL version {} does not match the sts " \ 185 "version {}".format(device_spl, self.sts_version) 186 LOG.error(err_msg) 187 raise ParamError(err_msg) 188 189 def __teardown__(self, device): 190 LOG.debug("STSKit teardown: device:{}, params:{}".format 191 (device, self.get_plugin_config().__dict__)) 192 193 194@Plugin(type=Plugin.TEST_KIT, id=CKit.push) 195class PushKit(ITestKit): 196 def __init__(self): 197 self.pre_push = "" 198 self.push_list = "" 199 self.post_push = "" 200 self.is_uninstall = "" 201 self.paths = "" 202 self.pushed_file = [] 203 self.abort_on_push_failure = True 204 self.teardown_push = "" 205 206 def __check_config__(self, config): 207 self.pre_push = get_config_value('pre-push', config) 208 self.push_list = get_config_value('push', config) 209 self.post_push = get_config_value('post-push', config) 210 self.teardown_push = get_config_value('teardown-push', config) 211 self.is_uninstall = get_config_value('uninstall', config, 212 is_list=False, default=True) 213 self.abort_on_push_failure = get_config_value( 214 'abort-on-push-failure', config, is_list=False, default=True) 215 if isinstance(self.abort_on_push_failure, str): 216 self.abort_on_push_failure = False if \ 217 self.abort_on_push_failure.lower() == "false" else True 218 219 self.paths = get_config_value('paths', config) 220 self.pushed_file = [] 221 222 def __setup__(self, device, **kwargs): 223 del kwargs 224 LOG.debug("PushKit setup, device:{}".format(device.device_sn)) 225 for command in self.pre_push: 226 run_command(device, command) 227 dst = None 228 remount(device) 229 for push_info in self.push_list: 230 files = re.split('->|=>', push_info) 231 if len(files) != 2: 232 LOG.error("The push spec is invalid: {}".format(push_info)) 233 continue 234 src = files[0].strip() 235 dst = files[1].strip() if files[1].strip().startswith("/") else \ 236 files[1].strip() + Props.dest_root 237 LOG.debug( 238 "Trying to push the file local {} to remote {}".format(src, 239 dst)) 240 241 try: 242 real_src_path = get_file_absolute_path(src, self.paths) 243 except ParamError as error: 244 if self.abort_on_push_failure: 245 raise error 246 else: 247 LOG.warning(error, error_no=error.error_no) 248 continue 249 # hdc don't support push directory now 250 if os.path.isdir(real_src_path): 251 device.connector_command("shell mkdir {}".format(dst)) 252 for root, _, files in os.walk(real_src_path): 253 for file in files: 254 device.push_file("{}".format(os.path.join(root, file)), 255 "{}".format(dst)) 256 LOG.debug( 257 "Push file finished from {} to {}".format( 258 os.path.join(root, file), dst)) 259 self.pushed_file.append(os.path.join(dst, file)) 260 else: 261 if device.is_directory(dst): 262 dst = os.path.join(dst, os.path.basename(real_src_path)) 263 if dst.find("\\") > -1: 264 dst_paths = dst.split("\\") 265 dst = "/".join(dst_paths) 266 device.push_file("{}".format(real_src_path), 267 "{}".format(dst)) 268 LOG.debug("Push file finished from {} to {}".format(src, dst)) 269 self.pushed_file.append(dst) 270 for command in self.post_push: 271 run_command(device, command) 272 return self.pushed_file, dst 273 274 def add_pushed_dir(self, src, dst): 275 for root, _, files in os.walk(src): 276 for file_path in files: 277 self.pushed_file.append( 278 os.path.join(root, file_path).replace(src, dst)) 279 280 def __teardown__(self, device): 281 LOG.debug("PushKit teardown: device:{}".format(device.device_sn)) 282 for command in self.teardown_push: 283 run_command(device, command) 284 if self.is_uninstall: 285 remount(device) 286 for file_name in self.pushed_file: 287 LOG.debug("Trying to remove file {}".format(file_name)) 288 file_name = file_name.replace("\\", "/") 289 290 for _ in range( 291 Props.trying_remove_maximum_times): 292 collect_receiver = CollectingOutputReceiver() 293 file_name = check_path_legal(file_name) 294 device.execute_shell_command("rm -rf {}".format( 295 file_name), receiver=collect_receiver, 296 output_flag=False) 297 if not collect_receiver.output: 298 LOG.debug( 299 "Removed file {} successfully".format(file_name)) 300 break 301 else: 302 LOG.error("Removed file {} successfully". 303 format(collect_receiver.output)) 304 else: 305 LOG.error("Failed to remove file {}".format(file_name)) 306 307 def __add_pushed_file__(self, device, src, dst): 308 if device.is_directory(dst): 309 dst = dst + os.path.basename(src) if dst.endswith( 310 "/") else dst + "/" + os.path.basename(src) 311 self.pushed_file.append(dst) 312 313 def __add_dir_pushed_files__(self, device, src, dst): 314 if device.file_exist(device, dst): 315 for _, dirs, files in os.walk(src): 316 for file_path in files: 317 if dst.endswith("/"): 318 dst = "%s%s" % (dst, os.path.basename(file_path)) 319 else: 320 dst = "%s/%s" % (dst, os.path.basename(file_path)) 321 self.pushed_file.append(dst) 322 for dir_name in dirs: 323 self.__add_dir_pushed_files__(device, dir_name, dst) 324 else: 325 self.pushed_file.append(dst) 326 327 328@Plugin(type=Plugin.TEST_KIT, id=CKit.propertycheck) 329class PropertyCheckKit(ITestKit): 330 def __init__(self): 331 self.prop_name = "" 332 self.expected_value = "" 333 self.throw_error = "" 334 335 def __check_config__(self, config): 336 self.prop_name = get_config_value('property-name', config, 337 is_list=False) 338 self.expected_value = get_config_value('expected-value', config, 339 is_list=False) 340 self.throw_error = get_config_value('throw-error', config, 341 is_list=False) 342 343 def __setup__(self, device, **kwargs): 344 del kwargs 345 LOG.debug("PropertyCheckKit setup, device:{}".format(device.device_sn)) 346 if not self.prop_name: 347 LOG.warning("The option of property-name not setting") 348 return 349 prop_value = device.get_property(self.prop_name) 350 if not prop_value: 351 LOG.warning( 352 "The property {} not found on device, cannot check the value". 353 format(self.prop_name)) 354 return 355 356 if prop_value != self.expected_value: 357 msg = "The value found for property {} is {}, not same with the " \ 358 "expected {}".format(self.prop_name, prop_value, 359 self.expected_value) 360 LOG.warning(msg) 361 if self.throw_error and self.throw_error.lower() == 'true': 362 raise Exception(msg) 363 364 @classmethod 365 def __teardown__(cls, device): 366 LOG.debug("PropertyCheckKit teardown: device:{}".format( 367 device.device_sn)) 368 369 370@Plugin(type=Plugin.TEST_KIT, id=CKit.shell) 371class ShellKit(ITestKit): 372 def __init__(self): 373 self.command_list = [] 374 self.tear_down_command = [] 375 self.paths = None 376 377 def __check_config__(self, config): 378 self.command_list = get_config_value('run-command', config) 379 self.tear_down_command = get_config_value('teardown-command', config) 380 self.tear_down_local_command = get_config_value('teardown-localcommand', config) 381 self.paths = get_config_value('paths', config) 382 383 def __setup__(self, device, **kwargs): 384 del kwargs 385 LOG.debug("ShellKit setup, device:{}".format(device.device_sn)) 386 if len(self.command_list) == 0: 387 LOG.info("No setup_command to run, skipping!") 388 return 389 for command in self.command_list: 390 run_command(device, command) 391 392 def __teardown__(self, device): 393 LOG.debug("ShellKit teardown: device:{}".format(device.device_sn)) 394 if len(self.tear_down_command) == 0: 395 LOG.info("No teardown_command to run, skipping!") 396 else: 397 for command in self.tear_down_command: 398 run_command(device, command) 399 if len(self.tear_down_local_command) == 0: 400 LOG.info("No teardown-localcommand to run, skipping!") 401 else: 402 for command in self.tear_down_local_command: 403 subprocess.run(command) 404 405 406@Plugin(type=Plugin.TEST_KIT, id=CKit.wifi) 407class WifiKit(ITestKit): 408 def __init__(self): 409 self.certfilename = "" 410 self.certpassword = "" 411 self.wifiname = "" 412 self.paths = "" 413 414 def __check_config__(self, config): 415 self.certfilename = get_config_value( 416 'certfilename', config, False, 417 default=None) 418 self.certpassword = get_config_value( 419 'certpassword', config, False, 420 default=None) 421 self.wifiname = get_config_value( 422 'wifiname', config, False, 423 default=None) 424 self.paths = get_config_value('paths', config) 425 426 def __setup__(self, device, **kwargs): 427 request = kwargs.get("request", None) 428 if not request: 429 LOG.error("WifiKit need input request") 430 return 431 testargs = request.get("testargs", {}) 432 self.certfilename = \ 433 testargs.pop("certfilename", [self.certfilename])[0] 434 self.wifiname = \ 435 testargs.pop("wifiname", [self.wifiname])[0] 436 self.certpassword = \ 437 testargs.pop("certpassword", [self.certpassword])[0] 438 del kwargs 439 LOG.debug("WifiKit setup, device:{}".format(device.device_sn)) 440 441 try: 442 wifi_app_path = get_file_absolute_path( 443 Props.Paths.service_wifi_app_path, self.paths) 444 except ParamError as _: 445 wifi_app_path = None 446 447 if wifi_app_path is None: 448 LOG.error("The resource wifi app file does not exist!") 449 return 450 451 try: 452 pfx_path = get_file_absolute_path( 453 "tools/wifi/%s" % self.certfilename 454 ) if self.certfilename else None 455 except ParamError as _: 456 pfx_path = None 457 458 if pfx_path is None: 459 LOG.error("The resource wifi pfx file does not exist!") 460 return 461 pfx_dest_path = \ 462 "/storage/emulated/0/%s" % self.certfilename 463 if self.wifiname is None: 464 LOG.error("The wifi name is not given!") 465 return 466 if self.certpassword is None: 467 LOG.error("The wifi password is not given!") 468 return 469 470 device.install_package(wifi_app_path, command="-r") 471 device.push_file(pfx_path, pfx_dest_path) 472 device.execute_shell_command("svc wifi enable") 473 for _ in range(Props.maximum_connect_wifi_times): 474 connect_wifi_cmd = Props.connect_wifi_cmd % ( 475 pfx_dest_path, 476 self.certpassword, 477 self.wifiname 478 ) 479 if device.execute_shell_command(connect_wifi_cmd): 480 LOG.info("Connect wifi successfully") 481 break 482 else: 483 LOG.error("Connect wifi failed") 484 485 @classmethod 486 def __teardown__(cls, device): 487 LOG.debug("WifiKit teardown: device:{}".format(device.device_sn)) 488 LOG.info("Disconnect wifi") 489 device.execute_shell_command("svc wifi disable") 490 491 492@dataclass 493class Props: 494 @dataclass 495 class Paths: 496 system_build_prop_path = "/%s/%s" % ("system", "build.prop") 497 service_wifi_app_path = "tools/wifi/%s" % "Service-wifi.app" 498 499 dest_root = "/%s/%s/" % ("data", "data") 500 mnt_external_storage = "EXTERNAL_STORAGE" 501 trying_remove_maximum_times = 3 502 maximum_connect_wifi_times = 3 503 connect_wifi_cmd = "am instrument -e request \"{module:Wifi, " \ 504 "method:connectWifiByCertificate, params:{'certPath':" \ 505 "'%s'," \ 506 "'certPassword':'%s'," \ 507 "'wifiName':'%s'}}\" " \ 508 "-w com.xdeviceservice.service/.MainInstrumentation" 509 security_patch = "ro.build.version.security_patch" 510 511 512@Plugin(type=Plugin.TEST_KIT, id=CKit.config) 513class ConfigKit(ITestKit): 514 def __init__(self): 515 self.is_connect_wifi = "" 516 self.is_disconnect_wifi = "" 517 self.wifi_kit = WifiKit() 518 self.min_external_store_space = "" 519 self.is_disable_dialing = "" 520 self.is_test_harness = "" 521 self.is_audio_silent = "" 522 self.is_disable_dalvik_verifier = "" 523 self.build_prop_list = "" 524 self.is_enable_hook = "" 525 self.cust_prop_file = "" 526 self.is_prop_changed = False 527 self.local_system_prop_file = "" 528 self.cust_props = "" 529 self.is_reboot_delay = "" 530 self.is_remount = "" 531 self.local_cust_prop_file = {} 532 533 def __check_config__(self, config): 534 self.is_connect_wifi = get_config_value('connect-wifi', config, 535 is_list=False, default=False) 536 self.is_disconnect_wifi = get_config_value( 537 'disconnect-wifi-after-test', config, is_list=False, default=True) 538 self.wifi_kit = WifiKit() 539 self.min_external_store_space = get_config_value( 540 'min-external-store-space', config) 541 self.is_disable_dialing = get_config_value('disable-dialing', config) 542 self.is_test_harness = get_config_value('set-test-harness', config) 543 self.is_audio_silent = get_config_value('audio-silent', config) 544 self.is_disable_dalvik_verifier = get_config_value( 545 'disable-dalvik-verifier', config) 546 self.build_prop_list = get_config_value('build-prop', config) 547 self.cust_prop_file = get_config_value('cust-prop-file', config) 548 self.cust_props = get_config_value('cust-prop', config) 549 self.is_enable_hook = get_config_value('enable-hook', config) 550 self.is_reboot_delay = get_config_value('reboot-delay', config) 551 self.is_remount = get_config_value('remount', config, default=True) 552 self.local_system_prop_file = NamedTemporaryFile(prefix='build', 553 suffix='.prop', 554 delete=False).name 555 556 def __setup__(self, device, **kwargs): 557 del kwargs 558 LOG.debug("ConfigKit setup, device:{}".format(device.device_sn)) 559 if self.is_remount: 560 remount(device) 561 self.is_prop_changed = self.modify_system_prop(device) 562 self.is_prop_changed = self.modify_cust_prop( 563 device) or self.is_prop_changed 564 565 keep_screen_on(device) 566 if self.is_enable_hook: 567 pass 568 if self.is_prop_changed: 569 device.reboot() 570 571 def __teardown__(self, device): 572 LOG.debug("ConfigKit teardown: device:{}".format(device.device_sn)) 573 if self.is_remount: 574 remount(device) 575 if self.is_connect_wifi and self.is_disconnect_wifi: 576 self.wifi_kit.__teardown__(device) 577 if self.is_prop_changed: 578 device.push_file(self.local_system_prop_file, 579 Props.Paths.system_build_prop_path) 580 device.execute_shell_command( 581 " ".join(["chmod 644", Props.Paths.system_build_prop_path])) 582 os.remove(self.local_system_prop_file) 583 584 for target_file, temp_file in self.local_cust_prop_file.items(): 585 device.push_file(temp_file, target_file) 586 device.execute_shell_command( 587 " ".join(["chmod 644", target_file])) 588 os.remove(temp_file) 589 590 def modify_system_prop(self, device): 591 prop_changed = False 592 new_props = {} 593 if self.is_disable_dialing: 594 new_props['ro.telephony.disable-call'] = 'true' 595 if self.is_test_harness: 596 new_props['ro.monkey'] = '1' 597 new_props['ro.test_harness'] = '1' 598 if self.is_audio_silent: 599 new_props['ro.audio.silent'] = '1' 600 if self.is_disable_dalvik_verifier: 601 new_props['dalvik.vm.dexopt-flags'] = 'v=n' 602 for prop in self.build_prop_list: 603 if prop is None or prop.find("=") < 0 or len(prop.split("=")) != 2: 604 LOG.warning("The build prop:{} not match the format " 605 "'key=value'".format(prop)) 606 continue 607 new_props[prop.split("=")[0]] = prop.split("=")[1] 608 if new_props: 609 prop_changed = modify_props(device, self.local_system_prop_file, 610 Props.Paths.system_build_prop_path, 611 new_props) 612 return prop_changed 613 614 def modify_cust_prop(self, device): 615 prop_changed = False 616 cust_files = {} 617 new_props = {} 618 for cust_prop_file in self.cust_prop_file: 619 # the correct format should be "CustName:/cust/prop/absolutepath" 620 if len(cust_prop_file.split(":")) != 2: 621 LOG.error( 622 "The value %s of option cust-prop-file is incorrect" % 623 cust_prop_file) 624 continue 625 cust_files[cust_prop_file.split(":")[0]] = \ 626 cust_prop_file.split(":")[1] 627 for prop in self.cust_props: 628 # the correct format should be "CustName:key=value" 629 prop_infos = re.split(r'[:|=]', prop) 630 if len(prop_infos) != 3: 631 LOG.error( 632 "The value {} of option cust-prop is incorrect".format( 633 prop)) 634 continue 635 file_name, key, value = prop_infos 636 if file_name not in cust_files: 637 LOG.error( 638 "The custName {} must be in cust-prop-file option".format( 639 file_name)) 640 continue 641 props = new_props.setdefault(file_name, {}) 642 props[key] = value 643 644 for name in new_props.keys(): 645 cust_file = cust_files.get(name) 646 temp_cust_file = NamedTemporaryFile(prefix='cust', suffix='.prop', 647 delete=False).name 648 self.local_cust_prop_file[cust_file] = temp_cust_file 649 try: 650 prop_changed = modify_props(device, temp_cust_file, cust_file, 651 new_props[name]) or prop_changed 652 except KeyError: 653 LOG.error("Get props error.") 654 continue 655 656 return prop_changed 657 658 659@Plugin(type=Plugin.TEST_KIT, id=CKit.app_install) 660class AppInstallKit(ITestKit): 661 def __init__(self): 662 self.app_list = "" 663 self.app_list_name = "" 664 self.is_clean = "" 665 self.alt_dir = "" 666 self.ex_args = "" 667 self.installed_app = set() 668 self.paths = "" 669 self.is_pri_app = "" 670 self.pushed_hap_file = set() 671 self.env_index_list = None 672 673 def __check_config__(self, options): 674 self.app_list = get_config_value('test-file-name', options) 675 self.app_list_name = get_config_value('test-file-packName', options) 676 self.is_clean = get_config_value('cleanup-apps', options, False) 677 self.alt_dir = get_config_value('alt-dir', options, False) 678 if self.alt_dir and self.alt_dir.startswith("resource/"): 679 self.alt_dir = self.alt_dir[len("resource/"):] 680 self.ex_args = get_config_value('install-arg', options, False) 681 self.installed_app = set() 682 self.paths = get_config_value('paths', options) 683 self.is_pri_app = get_config_value('install-as-privapp', options, 684 False, default=False) 685 self.env_index_list = get_config_value('env-index', options) 686 687 def __setup__(self, device, **kwargs): 688 del kwargs 689 LOG.debug("AppInstallKit setup, device:{}".format(device.device_sn)) 690 if len(self.app_list) == 0: 691 LOG.info("No app to install, skipping!") 692 return 693 # to disable app install alert 694 device.execute_shell_command("setprop persist.sys.platformautotest 1") 695 for app in self.app_list: 696 if self.alt_dir: 697 app_file = get_file_absolute_path(app, self.paths, 698 self.alt_dir) 699 else: 700 app_file = get_file_absolute_path(app, self.paths) 701 if app_file is None: 702 LOG.error("The app file {} does not exist".format(app)) 703 continue 704 if hasattr(device, "is_harmony") and device.is_harmony: 705 device.connector_command("install \"{}\"".format(app_file)) 706 else: 707 self.install_hap(device, app_file) 708 self.installed_app.add(app_file) 709 710 def __teardown__(self, device): 711 LOG.debug("AppInstallKit teardown: device:{}".format(device.device_sn)) 712 if self.is_clean and str(self.is_clean).lower() == "true": 713 if self.app_list_name and len(self.app_list_name) > 0: 714 for app_name in self.app_list_name: 715 result = device.uninstall_package(app_name) 716 if result and (result.startswith("Success") or "successfully" in result): 717 LOG.debug("uninstalling package Success. result is %s" % 718 result) 719 else: 720 LOG.warning("Error uninstalling package %s %s" % 721 (device.__get_serial__(), result)) 722 else: 723 for app in self.installed_app: 724 app_name = get_app_name(app) 725 if app_name: 726 result = device.uninstall_package(app_name) 727 if result and (result.startswith("Success") or "successfully" in result): 728 LOG.debug("uninstalling package Success. result is %s" % 729 result) 730 else: 731 LOG.warning("Error uninstalling package %s %s" % 732 (device.__get_serial__(), result)) 733 else: 734 LOG.warning("Can't find app name for %s" % app) 735 if self.is_pri_app: 736 remount(device) 737 for pushed_file in self.pushed_hap_file: 738 device.execute_shell_command("rm -r %s" % pushed_file) 739 740 def install_hap(self, device, hap_file): 741 if self.is_pri_app: 742 LOG.info("Install hap as privileged app {}".format(hap_file)) 743 hap_name = os.path.basename(hap_file).replace(".hap", "") 744 try: 745 with TemporaryDirectory(prefix=hap_name) as temp_dir: 746 zif_file = zipfile.ZipFile(hap_file) 747 zif_file.extractall(path=temp_dir) 748 entry_app = os.path.join(temp_dir, "Entry.app") 749 push_dest_dir = os.path.join("/system/priv-app/", hap_name) 750 device.execute_shell_command("rm -rf " + push_dest_dir, 751 output_flag=False) 752 device.push_file(entry_app, os.path.join( 753 push_dest_dir + os.path.basename(entry_app))) 754 device.push_file(hap_file, os.path.join( 755 push_dest_dir + os.path.basename(hap_file))) 756 self.pushed_hap_file.add(os.path.join( 757 push_dest_dir + os.path.basename(hap_file))) 758 device.reboot() 759 except RuntimeError as exception: 760 msg = "Install hap app failed withe error {}".format(exception) 761 LOG.error(msg) 762 raise Exception(msg) 763 except Exception as exception: 764 msg = "Install hap app failed withe exception {}".format( 765 exception) 766 LOG.error(msg) 767 raise Exception(msg) 768 finally: 769 zif_file.close() 770 else: 771 push_dest = "/%s" % "sdcard" 772 push_dest = "%s/%s" % (push_dest, os.path.basename(hap_file)) 773 device.push_file(hap_file, push_dest) 774 self.pushed_hap_file.add(push_dest) 775 output = device.execute_shell_command("bm install -p " + push_dest) 776 if not output.startswith("Success") and not "successfully" in output: 777 output = output.strip() 778 if "[ERROR_GET_BUNDLE_INSTALLER_FAILED]" not in output.upper(): 779 raise AppInstallError( 780 "Failed to install %s on %s. Reason:%s" % 781 (push_dest, device.__get_serial__(), output)) 782 else: 783 LOG.info("'[ERROR_GET_BUNDLE_INSTALLER_FAILED]' occurs, " 784 "retry install hap") 785 exec_out = self.retry_install_hap( 786 device, "bm install -p " + push_dest) 787 if not exec_out.startswith("Success") and not "successfully" in output: 788 raise AppInstallError( 789 "Retry failed,Can't install %s on %s. Reason:%s" % 790 (push_dest, device.__get_serial__(), exec_out)) 791 else: 792 LOG.debug("Install %s success" % push_dest) 793 794 @classmethod 795 def retry_install_hap(cls, device, command): 796 real_command = [HdcHelper.CONNECTOR_NAME, "-t", str(device.device_sn), "-s", 797 "tcp:%s:%s" % (str(device.host), str(device.port)), 798 "shell", command] 799 message = "%s execute command: %s" % \ 800 (convert_serial(device.device_sn), " ".join(real_command)) 801 LOG.info(message) 802 exec_out = "" 803 for wait_count in range(1, MAX_WAIT_COUNT): 804 LOG.debug("Retry times:%s, wait %ss" % 805 (wait_count, (wait_count * 10))) 806 time.sleep(wait_count * 10) 807 exec_out = exec_cmd(real_command) 808 if exec_out and exec_out.startswith("Success"): 809 break 810 if not exec_out: 811 exec_out = "System is not in %s" % ["Windows", "Linux", "Darwin"] 812 LOG.info("Retry install hap result is: [%s]" % exec_out.strip()) 813 return exec_out 814 815 816@Plugin(type=Plugin.TEST_KIT, id=CKit.component) 817class ComponentKit(ITestKit): 818 819 def __init__(self): 820 self._white_list_file = "" 821 self._white_list = "" 822 self._cap_file = "" 823 self.paths = "" 824 self.cache_subsystem = set() 825 self.cache_part = set() 826 827 def __check_config__(self, config): 828 self._white_list_file =\ 829 get_config_value('white-list', config, is_list=False) 830 self._cap_file = get_config_value('cap-file', config, is_list=False) 831 self.paths = get_config_value('paths', config) 832 833 def __setup__(self, device, **kwargs): 834 if hasattr(device, ConfigConst.support_component): 835 return 836 if device.label in ["phone", "watch", "car", "tv", "tablet", "ivi"]: 837 command = "cat %s" % self._cap_file 838 result = device.execute_shell_command(command) 839 part_set = set() 840 subsystem_set = set() 841 if "{" in result: 842 for item in json.loads(result).get("components", []): 843 part_set.add(item.get("component", "")) 844 subsystems, parts = self.get_white_list() 845 part_set.update(parts) 846 subsystem_set.update(subsystems) 847 setattr(device, ConfigConst.support_component, 848 (subsystem_set, part_set)) 849 self.cache_subsystem.update(subsystem_set) 850 self.cache_part.update(part_set) 851 852 def get_cache(self): 853 return self.cache_subsystem, self.cache_part 854 855 def get_white_list(self): 856 if not self._white_list and self._white_list_file: 857 self._white_list = self._parse_white_list() 858 return self._white_list 859 860 def _parse_white_list(self): 861 subsystem = set() 862 part = set() 863 white_json_file = os.path.normpath(self._white_list_file) 864 if not os.path.isabs(white_json_file): 865 white_json_file = \ 866 get_file_absolute_path(white_json_file, self.paths) 867 if os.path.isfile(white_json_file): 868 subsystem_list = list() 869 flags = os.O_RDONLY 870 modes = stat.S_IWUSR | stat.S_IRUSR 871 with os.fdopen(os.open(white_json_file, flags, modes), 872 "r") as file_content: 873 json_result = json.load(file_content) 874 if "subsystems" in json_result.keys(): 875 subsystem_list.extend(json_result["subsystems"]) 876 for subsystem_item_list in subsystem_list: 877 for key, value in subsystem_item_list.items(): 878 if key == "subsystem": 879 subsystem.add(value) 880 elif key == "components": 881 for component_item in value: 882 if "component" in component_item.keys(): 883 part.add( 884 component_item["component"]) 885 886 return subsystem, part 887 888 def __teardown__(self, device): 889 if hasattr(device, ConfigConst.support_component): 890 setattr(device, ConfigConst.support_component, None) 891 self._white_list_file = "" 892 self._white_list = "" 893 self._cap_file = "" 894 self.cache_subsystem.clear() 895 self.cache_part.clear() 896 self.cache_device.clear() 897 898 899@Plugin(type=Plugin.TEST_KIT , id=CKit.permission) 900class PermissionKit(ITestKit): 901 def __init__(self): 902 self.package_name_list = None 903 self.permission_list = None 904 905 def __check_config__(self, config): 906 self.package_name_list = \ 907 get_config_value('package-names', config, True, []) 908 self.permission_list = \ 909 get_config_value('permissions', config, True, []) 910 911 def __setup__(self, device, **kwargs): 912 if not self.package_name_list or not self.permission_list: 913 LOG.warning("Please check parameters of permission kit in json") 914 return 915 for index in range(len(self.package_name_list)): 916 cur_name = self.package_name_list[index] 917 token_id = self._get_token_id(device, cur_name) 918 if not token_id: 919 LOG.warning("Not found accessTokenId of '{}'".format(cur_name)) 920 continue 921 for permission in self.permission_list[index]: 922 command = "atm perm -g -i {} -p {}".format(token_id, 923 permission) 924 out = device.execute_shell_command(command) 925 LOG.debug("Set permission result: {}".format(out)) 926 927 def __teardown__(self, device): 928 pass 929 930 def _get_token_id(self, device, pkg_name): 931 # shell bm dump -n 932 dump_command = "bm dump -n {}".format(pkg_name) 933 content = device.execute_shell_command(dump_command) 934 if not content or not str(content).startswith(pkg_name): 935 return "" 936 content = content[len(pkg_name) + len(":\n"):] 937 dump_dict = json.loads(content) 938 if "userInfo" not in dump_dict.keys(): 939 return "" 940 user_info_dict = dump_dict["userInfo"][0] 941 if "accessTokenId" not in user_info_dict.keys(): 942 return "" 943 else: 944 return user_info_dict["accessTokenId"] 945 946 947def keep_screen_on(device): 948 device.execute_shell_command("svc power stayon true") 949 950 951def run_command(device, command): 952 LOG.debug("The command:{} is running".format(command)) 953 stdout = None 954 if command.strip() == "remount": 955 remount(device) 956 elif command.strip() == "reboot": 957 device.reboot() 958 elif command.strip() == "reboot-delay": 959 pass 960 elif command.strip().endswith("&"): 961 device.execute_shell_in_daemon(command.strip()) 962 else: 963 stdout = device.execute_shell_command(command) 964 LOG.debug("Run command result: %s" % (stdout if stdout else "")) 965 return stdout 966 967 968def junit_dex_para_parse(device, junit_paras, prefix_char="--"): 969 """To parse the para of junit 970 Args: 971 device: the device running 972 junit_paras: the para dict of junit 973 prefix_char: the prefix char of parsed cmd 974 Returns: 975 the new para using in a command like -e testFile xxx 976 -e coverage true... 977 """ 978 ret_str = [] 979 path = "/%s/%s/%s" % ("data", "local", "ajur") 980 include_file = "%s/%s" % (path, "includes.txt") 981 exclude_file = "%s/%s" % (path, "excludes.txt") 982 983 if not isinstance(junit_paras, dict): 984 LOG.warning("The para of junit is not the dict format as required") 985 return "" 986 # Disable screen keyguard 987 disable_key_guard = junit_paras.get('disable-keyguard') 988 if not disable_key_guard or disable_key_guard[0].lower() != 'false': 989 disable_keyguard(device) 990 991 for para_name in junit_paras.keys(): 992 path = "/%s/%s/%s/" % ("data", "local", "ajur") 993 if para_name.strip() == 'test-file-include-filter': 994 for file_name in junit_paras[para_name]: 995 device.push_file(file_name, include_file) 996 device.execute_shell_command( 997 'chown -R shell:shell %s' % path) 998 ret_str.append(prefix_char + " ".join(['testFile', include_file])) 999 elif para_name.strip() == "test-file-exclude-filter": 1000 for file_name in junit_paras[para_name]: 1001 device.push_file(file_name, include_file) 1002 device.execute_shell_command( 1003 'chown -R shell:shell %s' % path) 1004 ret_str.append(prefix_char + " ".join(['notTestFile', 1005 exclude_file])) 1006 elif para_name.strip() == "test" or para_name.strip() == "class": 1007 result = get_class(junit_paras, prefix_char, para_name.strip()) 1008 ret_str.append(result) 1009 elif para_name.strip() == "include-annotation": 1010 ret_str.append(prefix_char + " ".join( 1011 ['annotation', ",".join(junit_paras[para_name])])) 1012 elif para_name.strip() == "exclude-annotation": 1013 ret_str.append(prefix_char + " ".join( 1014 ['notAnnotation', ",".join(junit_paras[para_name])])) 1015 else: 1016 ret_str.append(prefix_char + " ".join( 1017 [para_name, ",".join(junit_paras[para_name])])) 1018 1019 return " ".join(ret_str) 1020 1021 1022def get_app_name(hap_app): 1023 hap_name = os.path.basename(hap_app).replace(".hap", "") 1024 app_name = "" 1025 hap_file_info = None 1026 config_json_file = "" 1027 try: 1028 hap_file_info = zipfile.ZipFile(hap_app) 1029 name_list = ["module.json", "config.json"] 1030 for _, name in enumerate(hap_file_info.namelist()): 1031 if name in name_list: 1032 config_json_file = name 1033 break 1034 config_info = hap_file_info.read(config_json_file).decode('utf-8') 1035 attrs = json.loads(config_info) 1036 if "app" in attrs.keys() and \ 1037 "bundleName" in attrs.get("app", dict()).keys(): 1038 app_name = attrs["app"]["bundleName"] 1039 LOG.info("Obtain the app name {} from json " 1040 "successfully".format(app_name)) 1041 else: 1042 LOG.debug("Tip: 'app' or 'bundleName' not " 1043 "in %s.hap/config.json" % hap_name) 1044 except Exception as e: 1045 LOG.error("get app name from hap error: {}".format(e)) 1046 finally: 1047 if hap_file_info: 1048 hap_file_info.close() 1049 return app_name 1050 1051 1052def oh_jsunit_para_parse(runner, junit_paras): 1053 junit_paras = dict(junit_paras) 1054 test_type_list = ["function", "performance", "reliability", "security"] 1055 size_list = ["small", "medium", "large"] 1056 level_list = ["0", "1", "2", "3"] 1057 for para_name in junit_paras.keys(): 1058 para_name = para_name.strip() 1059 para_values = junit_paras.get(para_name, []) 1060 if para_name == "class": 1061 runner.add_arg(para_name, ",".join(para_values)) 1062 elif para_name == "notClass": 1063 runner.add_arg(para_name, ",".join(para_values)) 1064 elif para_name == "testType": 1065 if para_values[0] not in test_type_list: 1066 continue 1067 # function/performance/reliability/security 1068 runner.add_arg(para_name, para_values[0]) 1069 elif para_name == "size": 1070 if para_values[0] not in size_list: 1071 continue 1072 # size small/medium/large 1073 runner.add_arg(para_name, para_values[0]) 1074 elif para_name == "level": 1075 if para_values[0] not in level_list: 1076 continue 1077 # 0/1/2/3/4 1078 runner.add_arg(para_name, para_values[0]) 1079 elif para_name == "stress": 1080 runner.add_arg(para_name, para_values[0]) 1081