1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import platform 21import re 22import signal 23import subprocess 24import zipfile 25import stat 26import time 27import json 28from dataclasses import dataclass 29from tempfile import TemporaryDirectory 30from tempfile import NamedTemporaryFile 31 32from xdevice import ITestKit 33from xdevice import platform_logger 34from xdevice import Plugin 35from xdevice import ParamError 36from xdevice import get_file_absolute_path 37from xdevice import get_config_value 38from xdevice import exec_cmd 39from xdevice import ConfigConst 40 41from xdevice_extension._core.constants import CKit 42from xdevice_extension._core.environment.dmlib import CollectingOutputReceiver 43from xdevice_extension._core.utils import check_path_legal 44from xdevice_extension._core.utils import modify_props 45from xdevice_extension._core.exception import AppInstallError 46from xdevice_extension._core.constants import DeviceConnectorType 47from xdevice_extension._core.utils import convert_serial 48 49 50__all__ = ["STSKit", "PushKit", "PropertyCheckKit", "ShellKit", "WifiKit", 51 "ConfigKit", "AppInstallKit", "ComponentKit", "junit_para_parse", 52 "gtest_para_parse", "reset_junit_para", "oh_jsunit_para_parse"] 53 54LOG = platform_logger("Kit") 55 56 57@Plugin(type=Plugin.TEST_KIT, id=CKit.sts) 58class STSKit(ITestKit): 59 def __init__(self): 60 self.sts_version = "" 61 self.throw_error = "" 62 63 def __check_config__(self, config): 64 self.sts_version = get_config_value('sts-version', config) 65 self.throw_error = get_config_value('throw-error', config) 66 if len(self.sts_version) < 1: 67 raise TypeError( 68 "The sts_version: {} is invalid".format(self.sts_version)) 69 70 def __setup__(self, device, **kwargs): 71 del kwargs 72 LOG.debug("STSKit setup, device:{}, params:{}". 73 format(device, self.get_plugin_config().__dict__)) 74 device_spl = device.get_property(Props.security_patch) 75 if device_spl is None or device_spl == "": 76 LOG.error("The device security {} is invalid".format(device_spl)) 77 raise ParamError( 78 "The device security patch version {} is invalid".format( 79 device_spl)) 80 rex = '^[a-zA-Z\\d\\.]+_([\\d]+-[\\d]+)$' 81 match = re.match(rex, self.sts_version) 82 if match is None: 83 LOG.error("The sts version {} does match the rule".format( 84 self.sts_version)) 85 raise ParamError("The sts version {} does match the rule".format( 86 self.sts_version)) 87 sts_version_date_user = match.group(1).join("-01") 88 sts_version_date_kernel = match.group(1).join("-05") 89 if device_spl in [sts_version_date_user, sts_version_date_kernel]: 90 LOG.info( 91 "The device SPL version {} match the sts version {}".format( 92 device_spl, self.sts_version)) 93 else: 94 err_msg = "The device SPL version {} does not match the sts " \ 95 "version {}".format(device_spl, self.sts_version) 96 LOG.error(err_msg) 97 raise ParamError(err_msg) 98 99 def __teardown__(self, device): 100 LOG.debug("STSKit teardown: device:{}, params:{}".format 101 (device, self.get_plugin_config().__dict__)) 102 103 104@Plugin(type=Plugin.TEST_KIT, id=CKit.push) 105class PushKit(ITestKit): 106 def __init__(self): 107 self.pre_push = "" 108 self.push_list = "" 109 self.post_push = "" 110 self.is_uninstall = "" 111 self.paths = "" 112 self.pushed_file = [] 113 self.abort_on_push_failure = True 114 115 def __check_config__(self, config): 116 self.pre_push = get_config_value('pre-push', config) 117 self.push_list = get_config_value('push', config) 118 self.post_push = get_config_value('post-push', config) 119 self.is_uninstall = get_config_value('uninstall', config, 120 is_list=False, default=True) 121 self.abort_on_push_failure = get_config_value( 122 'abort-on-push-failure', config, is_list=False, default=True) 123 if isinstance(self.abort_on_push_failure, str): 124 self.abort_on_push_failure = False if \ 125 self.abort_on_push_failure.lower() == "false" else True 126 127 self.paths = get_config_value('paths', config) 128 self.pushed_file = [] 129 130 def __setup__(self, device, **kwargs): 131 del kwargs 132 LOG.debug("PushKit setup, device:{}".format(device.device_sn)) 133 for command in self.pre_push: 134 run_command(device, command) 135 dst = None 136 for push_info in self.push_list: 137 files = re.split('->|=>', push_info) 138 if len(files) != 2: 139 LOG.error("The push spec is invalid: {}".format(push_info)) 140 continue 141 src = files[0].strip() 142 dst = files[1].strip() if files[1].strip().startswith("/") else \ 143 files[1].strip() + Props.dest_root 144 LOG.debug( 145 "Trying to push the file local {} to remote {}".format(src, 146 dst)) 147 148 try: 149 real_src_path = get_file_absolute_path(src, self.paths) 150 except ParamError as error: 151 if self.abort_on_push_failure: 152 raise 153 else: 154 LOG.warning(error, error_no=error.error_no) 155 continue 156 remount(device) 157 # hdc don't support push directory now 158 if os.path.isdir(real_src_path): 159 device.hdc_command("shell mkdir {}".format(dst)) 160 for root, _, files in os.walk(real_src_path): 161 for file in files: 162 device.hdc_command( 163 "file send {} {}".format(os.path.join(root, file), 164 dst)) 165 LOG.debug( 166 "Push file finished from {} to {}".format( 167 os.path.join(root, file), dst)) 168 self.pushed_file.append(os.path.join(dst, file)) 169 else: 170 device.hdc_command("file send {} {}".format(real_src_path, 171 dst)) 172 LOG.debug("Push file finished from {} to {}".format(src, dst)) 173 self.pushed_file.append(dst) 174 for command in self.post_push: 175 run_command(device, command) 176 return self.pushed_file, dst 177 178 def add_pushed_dir(self, src, dst): 179 for root, _, files in os.walk(src): 180 for file_path in files: 181 self.pushed_file.append( 182 os.path.join(root, file_path).replace(src, dst)) 183 184 def __teardown__(self, device): 185 LOG.debug("PushKit teardown: device:{}".format(device.device_sn)) 186 if self.is_uninstall: 187 remount(device) 188 for file_name in self.pushed_file: 189 LOG.debug("Trying to remove file {}".format(file_name)) 190 file_name = file_name.replace("\\", "/") 191 192 for _ in range( 193 Props.trying_remove_maximum_times): 194 collect_receiver = CollectingOutputReceiver() 195 file_name = check_path_legal(file_name) 196 device.execute_shell_command("rm -rf {}".format( 197 file_name), receiver=collect_receiver, 198 output_flag=False) 199 if not collect_receiver.output: 200 LOG.debug( 201 "Removed file {} successfully".format(file_name)) 202 break 203 else: 204 LOG.error("Removed file {} successfully". 205 format(collect_receiver.output)) 206 else: 207 LOG.error("Failed to remove file {}".format(file_name)) 208 209 def __add_pushed_file__(self, device, src, dst): 210 if device.is_directory(dst): 211 dst = dst + os.path.basename(src) if dst.endswith( 212 "/") else dst + "/" + os.path.basename(src) 213 self.pushed_file.append(dst) 214 215 def __add_dir_pushed_files__(self, device, src, dst): 216 if device.file_exist(device, dst): 217 for _, dirs, files in os.walk(src): 218 for file_path in files: 219 if dst.endswith("/"): 220 dst = "%s%s" % (dst, os.path.basename(file_path)) 221 else: 222 dst = "%s/%s" % (dst, os.path.basename(file_path)) 223 self.pushed_file.append(dst) 224 for dir_name in dirs: 225 self.__add_dir_pushed_files__(device, dir_name, dst) 226 else: 227 self.pushed_file.append(dst) 228 229 230@Plugin(type=Plugin.TEST_KIT, id=CKit.propertycheck) 231class PropertyCheckKit(ITestKit): 232 def __init__(self): 233 self.prop_name = "" 234 self.expected_value = "" 235 self.throw_error = "" 236 237 def __check_config__(self, config): 238 self.prop_name = get_config_value('property-name', config, 239 is_list=False) 240 self.expected_value = get_config_value('expected-value', config, 241 is_list=False) 242 self.throw_error = get_config_value('throw-error', config, 243 is_list=False) 244 245 def __setup__(self, device, **kwargs): 246 del kwargs 247 LOG.debug("PropertyCheckKit setup, device:{}".format(device.device_sn)) 248 if not self.prop_name: 249 LOG.warning("The option of property-name not setting") 250 return 251 prop_value = device.get_property(self.prop_name) 252 if not prop_value: 253 LOG.warning( 254 "The property {} not found on device, cannot check the value". 255 format(self.prop_name)) 256 return 257 258 if prop_value != self.expected_value: 259 msg = "The value found for property {} is {}, not same with the " \ 260 "expected {}".format(self.prop_name, prop_value, 261 self.expected_value) 262 LOG.warning(msg) 263 if self.throw_error and self.throw_error.lower() == 'true': 264 raise Exception(msg) 265 266 @classmethod 267 def __teardown__(cls, device): 268 LOG.debug("PropertyCheckKit teardown: device:{}".format( 269 device.device_sn)) 270 271 272@Plugin(type=Plugin.TEST_KIT, id=CKit.shell) 273class ShellKit(ITestKit): 274 def __init__(self): 275 self.command_list = [] 276 self.tear_down_command = [] 277 self.paths = None 278 279 def __check_config__(self, config): 280 self.command_list = get_config_value('run-command', config) 281 self.tear_down_command = get_config_value('teardown-command', config) 282 self.paths = get_config_value('paths', config) 283 284 def __setup__(self, device, **kwargs): 285 del kwargs 286 LOG.debug("ShellKit setup, device:{}".format(device.device_sn)) 287 if len(self.command_list) == 0: 288 LOG.info("No setup_command to run, skipping!") 289 return 290 for command in self.command_list: 291 run_command(device, command) 292 293 def __teardown__(self, device): 294 LOG.debug("ShellKit teardown: device:{}".format(device.device_sn)) 295 if len(self.tear_down_command) == 0: 296 LOG.info("No teardown_command to run, skipping!") 297 return 298 for command in self.tear_down_command: 299 run_command(device, command) 300 301 302@Plugin(type=Plugin.TEST_KIT, id=CKit.wifi) 303class WifiKit(ITestKit): 304 def __init__(self): 305 self.certfilename = "" 306 self.certpassword = "" 307 self.wifiname = "" 308 309 def __check_config__(self, config): 310 self.certfilename = get_config_value( 311 'certfilename', config, False, 312 default=None) 313 self.certpassword = get_config_value( 314 'certpassword', config, False, 315 default=None) 316 self.wifiname = get_config_value( 317 'wifiname', config, False, 318 default=None) 319 320 def __setup__(self, device, **kwargs): 321 request = kwargs.get("request", None) 322 if not request: 323 LOG.error("WifiKit need input request") 324 return 325 testargs = request.get("testargs", {}) 326 self.certfilename = \ 327 testargs.pop("certfilename", [self.certfilename])[0] 328 self.wifiname = \ 329 testargs.pop("wifiname", [self.wifiname])[0] 330 self.certpassword = \ 331 testargs.pop("certpassword", [self.certpassword])[0] 332 del kwargs 333 LOG.debug("WifiKit setup, device:{}".format(device.device_sn)) 334 335 try: 336 wifi_app_path = get_file_absolute_path( 337 Props.Paths.service_wifi_app_path) 338 except ParamError: 339 wifi_app_path = None 340 341 if wifi_app_path is None: 342 LOG.error("The resource wifi app file does not exist!") 343 return 344 345 try: 346 pfx_path = get_file_absolute_path( 347 "tools/wifi/%s" % self.certfilename 348 ) if self.certfilename else None 349 except ParamError: 350 pfx_path = None 351 352 if pfx_path is None: 353 LOG.error("The resource wifi pfx file does not exist!") 354 return 355 pfx_dest_path = \ 356 "/storage/emulated/0/%s" % self.certfilename 357 if self.wifiname is None: 358 LOG.error("The wifi name is not given!") 359 return 360 if self.certpassword is None: 361 LOG.error("The wifi password is not given!") 362 return 363 364 device.install_package(wifi_app_path, command="-r") 365 device.push_file(pfx_path, pfx_dest_path) 366 device.execute_shell_command("svc wifi enable") 367 for _ in range(Props.maximum_connect_wifi_times): 368 connect_wifi_cmd = Props.connect_wifi_cmd % ( 369 pfx_dest_path, 370 self.certpassword, 371 self.wifiname 372 ) 373 if device.execute_shell_command(connect_wifi_cmd): 374 LOG.info("Connect wifi successfully") 375 break 376 else: 377 LOG.error("Connect wifi failed") 378 379 @classmethod 380 def __teardown__(cls, device): 381 LOG.debug("WifiKit teardown: device:{}".format(device.device_sn)) 382 LOG.info("Disconnect wifi") 383 device.execute_shell_command("svc wifi disable") 384 385 386@dataclass 387class Props: 388 @dataclass 389 class Paths: 390 system_build_prop_path = "/%s/%s" % ("system", "build.prop") 391 service_wifi_app_path = "tools/wifi/%s" % "Service-wifi.app" 392 393 dest_root = "/%s/%s/" % ("data", "data") 394 mnt_external_storage = "EXTERNAL_STORAGE" 395 trying_remove_maximum_times = 3 396 maximum_connect_wifi_times = 3 397 connect_wifi_cmd = "am instrument -e request \"{module:Wifi, " \ 398 "method:connectWifiByCertificate, params:{'certPath':" \ 399 "'%s'," \ 400 "'certPassword':'%s'," \ 401 "'wifiName':'%s'}}\" " \ 402 "-w com.xdeviceservice.service/.MainInstrumentation" 403 security_patch = "ro.build.version.security_patch" 404 405 406@Plugin(type=Plugin.TEST_KIT, id=CKit.config) 407class ConfigKit(ITestKit): 408 def __init__(self): 409 self.is_connect_wifi = "" 410 self.is_disconnect_wifi = "" 411 self.wifi_kit = WifiKit() 412 self.min_external_store_space = "" 413 self.is_disable_dialing = "" 414 self.is_test_harness = "" 415 self.is_audio_silent = "" 416 self.build_prop_list = "" 417 self.is_enable_hook = "" 418 self.cust_prop_file = "" 419 self.is_prop_changed = False 420 self.local_system_prop_file = "" 421 self.cust_props = "" 422 self.is_reboot_delay = "" 423 self.is_remount = "" 424 self.local_cust_prop_file = {} 425 426 def __check_config__(self, config): 427 self.is_connect_wifi = get_config_value('connect-wifi', config, 428 is_list=False, default=False) 429 self.is_disconnect_wifi = get_config_value( 430 'disconnect-wifi-after-test', config, is_list=False, default=True) 431 self.wifi_kit = WifiKit() 432 self.min_external_store_space = get_config_value( 433 'min-external-store-space', config) 434 self.is_disable_dialing = get_config_value('disable-dialing', config) 435 self.is_test_harness = get_config_value('set-test-harness', config) 436 self.is_audio_silent = get_config_value('audio-silent', config) 437 self.build_prop_list = get_config_value('build-prop', config) 438 self.cust_prop_file = get_config_value('cust-prop-file', config) 439 self.cust_props = get_config_value('cust-prop', config) 440 self.is_enable_hook = get_config_value('enable-hook', config) 441 self.is_reboot_delay = get_config_value('reboot-delay', config) 442 self.is_remount = get_config_value('remount', config, default=True) 443 self.local_system_prop_file = NamedTemporaryFile(prefix='build', 444 suffix='.prop', 445 delete=False).name 446 447 def __setup__(self, device, **kwargs): 448 del kwargs 449 LOG.debug("ConfigKit setup, device:{}".format(device.device_sn)) 450 if self.is_remount: 451 remount(device) 452 self.is_prop_changed = self.modify_system_prop(device) 453 self.is_prop_changed = self.modify_cust_prop( 454 device) or self.is_prop_changed 455 456 keep_screen_on(device) 457 if self.is_enable_hook: 458 pass 459 if self.is_prop_changed: 460 device.reboot() 461 462 def __teardown__(self, device): 463 LOG.debug("ConfigKit teardown: device:{}".format(device.device_sn)) 464 if self.is_remount: 465 remount(device) 466 if self.is_connect_wifi and self.is_disconnect_wifi: 467 self.wifi_kit.__teardown__(device) 468 if self.is_prop_changed: 469 device.push_file(self.local_system_prop_file, 470 Props.Paths.system_build_prop_path) 471 device.execute_shell_command( 472 " ".join(["chmod 644", Props.Paths.system_build_prop_path])) 473 os.remove(self.local_system_prop_file) 474 475 for target_file, temp_file in self.local_cust_prop_file.items(): 476 device.push_file(temp_file, target_file) 477 device.execute_shell_command( 478 " ".join(["chmod 644", target_file])) 479 os.remove(temp_file) 480 481 def modify_system_prop(self, device): 482 prop_changed = False 483 new_props = {} 484 if self.is_disable_dialing: 485 new_props['ro.telephony.disable-call'] = 'true' 486 if self.is_test_harness: 487 new_props['ro.monkey'] = '1' 488 new_props['ro.test_harness'] = '1' 489 if self.is_audio_silent: 490 new_props['ro.audio.silent'] = '1' 491 for prop in self.build_prop_list: 492 if prop is None or prop.find("=") < 0 or len(prop.split("=")) != 2: 493 LOG.warning("The build prop:{} not match the format " 494 "'key=value'".format(prop)) 495 continue 496 new_props[prop.split("=")[0]] = prop.split("=")[1] 497 if new_props: 498 prop_changed = modify_props(device, self.local_system_prop_file, 499 Props.Paths.system_build_prop_path, 500 new_props) 501 return prop_changed 502 503 def modify_cust_prop(self, device): 504 prop_changed = False 505 cust_files = {} 506 new_props = {} 507 for cust_prop_file in self.cust_prop_file: 508 # the correct format should be "CustName:/cust/prop/absolutepath" 509 if len(cust_prop_file.split(":")) != 2: 510 LOG.error( 511 "The value %s of option cust-prop-file is incorrect" % 512 cust_prop_file) 513 continue 514 cust_files[cust_prop_file.split(":")[0]] = \ 515 cust_prop_file.split(":")[1] 516 for prop in self.cust_props: 517 # the correct format should be "CustName:key=value" 518 prop_infos = re.split(r'[:|=]', prop) 519 if len(prop_infos) != 3: 520 LOG.error( 521 "The value {} of option cust-prop is incorrect".format( 522 prop)) 523 continue 524 file_name, key, value = prop_infos 525 if file_name not in cust_files: 526 LOG.error( 527 "The custName {} must be in cust-prop-file option".format( 528 file_name)) 529 continue 530 props = new_props.setdefault(file_name, {}) 531 props[key] = value 532 533 for name in new_props.keys(): 534 cust_file = cust_files.get(name) 535 temp_cust_file = NamedTemporaryFile(prefix='cust', suffix='.prop', 536 delete=False).name 537 self.local_cust_prop_file[cust_file] = temp_cust_file 538 prop_changed = modify_props(device, temp_cust_file, cust_file, 539 new_props[name]) or prop_changed 540 return prop_changed 541 542 543@Plugin(type=Plugin.TEST_KIT, id=CKit.appinstall) 544class AppInstallKit(ITestKit): 545 def __init__(self): 546 self.app_list = "" 547 self.is_clean = "" 548 self.alt_dir = "" 549 self.ex_args = "" 550 self.installed_app = [] 551 self.paths = "" 552 self.is_pri_app = "" 553 self.pushed_hap_file = [] 554 555 def __check_config__(self, options): 556 self.app_list = get_config_value('test-file-name', options) 557 self.is_clean = get_config_value('cleanup-apps', options, False) 558 self.alt_dir = get_config_value('alt-dir', options, False) 559 if self.alt_dir and self.alt_dir.startswith("resource/"): 560 self.alt_dir = self.alt_dir[len("resource/"):] 561 self.ex_args = get_config_value('install-arg', options) 562 self.installed_app = [] 563 self.paths = get_config_value('paths', options) 564 self.is_pri_app = get_config_value('install-as-privapp', options, 565 False, default=False) 566 567 def __setup__(self, device, **kwargs): 568 del kwargs 569 LOG.debug("AppInstallKit setup, device:{}".format(device.device_sn)) 570 if len(self.app_list) == 0: 571 LOG.info("No app to install, skipping!") 572 return 573 for app in self.app_list: 574 if self.alt_dir: 575 app_file = get_file_absolute_path(app, self.paths, 576 self.alt_dir) 577 else: 578 app_file = get_file_absolute_path(app, self.paths) 579 if app_file is None: 580 LOG.error("The app file {} does not exist".format(app)) 581 continue 582 if app_file.endswith(".hap"): 583 # use install command directly 584 device.hdc_command("install {}".format(app_file)) 585 else: 586 result = device.install_package( 587 app_file, get_install_args( 588 device, app_file, self.ex_args)) 589 if not result or not result.startswith("Success"): 590 raise AppInstallError( 591 "Failed to install %s on %s. Reason:%s" % 592 (app_file, device.__get_serial__(), result)) 593 self.installed_app.append(app_file) 594 595 def __teardown__(self, device): 596 LOG.debug("AppInstallKit teardown: device:{}".format(device.device_sn)) 597 if self.is_clean and str(self.is_clean).lower() == "true": 598 for app in self.installed_app: 599 app_name = get_app_name(app) 600 601 if app_name: 602 device.hdc_command("uninstall {}". 603 format(app_name)) 604 time.sleep(20) 605 else: 606 LOG.warning("Can't find app_name for %s" % app) 607 if self.is_pri_app: 608 remount(device) 609 for pushed_file in self.pushed_hap_file: 610 device.execute_shell_command("rm -r %s" % pushed_file) 611 612 def install_hap(self, device, hap_file): 613 if self.is_pri_app: 614 LOG.info("Install hap as privileged app {}".format(hap_file)) 615 hap_name = os.path.basename(hap_file).replace(".hap", "") 616 try: 617 with TemporaryDirectory(prefix=hap_name) as temp_dir: 618 zif_file = zipfile.ZipFile(hap_file) 619 zif_file.extractall(path=temp_dir) 620 entry_app = os.path.join(temp_dir, "Entry.app") 621 push_dest_dir = os.path.join("/system/priv-app/", hap_name) 622 device.execute_shell_command("rm -rf " + push_dest_dir, 623 output_flag=False) 624 device.push_file(entry_app, os.path.join( 625 push_dest_dir + os.path.basename(entry_app))) 626 device.push_file(hap_file, os.path.join( 627 push_dest_dir + os.path.basename(hap_file))) 628 self.pushed_hap_file.append(os.path.join( 629 push_dest_dir + os.path.basename(hap_file))) 630 device.reboot() 631 except RuntimeError as exception: 632 msg = "Install hap app failed with error {}".format(exception) 633 LOG.error(msg) 634 raise Exception(msg) 635 except Exception as exception: 636 msg = "Install hap app failed with exception {}".format( 637 exception) 638 LOG.error(msg) 639 raise Exception(msg) 640 finally: 641 zif_file.close() 642 else: 643 output = device.hdc_command("install %s" % hap_file) 644 LOG.debug("Install %s, and output = %s" % (hap_file, output)) 645 646 @classmethod 647 def retry_install_hap(cls, device, command): 648 if device.usb_type == DeviceConnectorType.hdc: 649 # hdc -t UID -s tcp:IP:PORT 650 real_command = ["hdc", "-t", str(device.device_sn), "-s", 651 "tcp:%s:%s" % (str(device.host), str(device.port)), 652 "shell", command] 653 message = "%s execute command: %s" % \ 654 (convert_serial(device.device_sn), " ".join(real_command)) 655 LOG.info(message) 656 exec_out = "" 657 for wait_count in range(1, 4): 658 LOG.debug("retry times:%s, wait %ss" % 659 (wait_count, (wait_count * 10))) 660 time.sleep(wait_count * 10) 661 exec_out = exec_cmd(real_command) 662 if exec_out and exec_out.startswith("Success"): 663 break 664 if not exec_out: 665 exec_out = "System is not in %s" % ["Windows", "Linux", "Darwin"] 666 LOG.info("retry install hap result is: [%s]" % exec_out.strip()) 667 return exec_out 668 669 670@Plugin(type=Plugin.TEST_KIT, id=CKit.component) 671class ComponentKit(ITestKit): 672 673 def __init__(self): 674 self._white_list_file = "" 675 self._white_list = "" 676 self._cap_file = "" 677 self.paths = "" 678 self.cache_subsystem = set() 679 self.cache_part = set() 680 681 def __check_config__(self, config): 682 self._white_list_file =\ 683 get_config_value('white-list', config, is_list=False) 684 self._cap_file = get_config_value('cap-file', config, is_list=False) 685 self.paths = get_config_value('paths', config) 686 687 def __setup__(self, device, **kwargs): 688 if hasattr(device, ConfigConst.support_component): 689 return 690 if device.label in ["phone", "watch", "car", "tv", "tablet", "ivi"]: 691 command = "cat %s" % self._cap_file 692 result = device.execute_shell_command(command) 693 part_set = set() 694 subsystem_set = set() 695 if "{" in result: 696 for item in json.loads(result).get("components", []): 697 part_set.add(item.get("component", "")) 698 subsystems, parts = self.get_white_list() 699 part_set.update(parts) 700 subsystem_set.update(subsystems) 701 setattr(device, ConfigConst.support_component, 702 (subsystem_set, part_set)) 703 self.cache_subsystem.update(subsystem_set) 704 self.cache_part.update(part_set) 705 706 def get_cache(self): 707 return self.cache_subsystem, self.cache_part 708 709 def get_white_list(self): 710 if not self._white_list and self._white_list_file: 711 self._white_list = self._parse_white_list() 712 return self._white_list 713 714 def _parse_white_list(self): 715 subsystem = set() 716 part = set() 717 white_json_file = os.path.normpath(self._white_list_file) 718 if not os.path.isabs(white_json_file): 719 white_json_file = \ 720 get_file_absolute_path(white_json_file, self.paths) 721 if os.path.isfile(white_json_file): 722 subsystem_list = list() 723 flags = os.O_RDONLY 724 modes = stat.S_IWUSR | stat.S_IRUSR 725 with os.fdopen(os.open(white_json_file, flags, modes), 726 "r") as file_content: 727 json_result = json.load(file_content) 728 if "subsystems" in json_result.keys(): 729 subsystem_list.extend(json_result["subsystems"]) 730 for subsystem_item_list in subsystem_list: 731 for key, value in subsystem_item_list.items(): 732 if key == "subsystem": 733 subsystem.add(value) 734 elif key == "components": 735 for component_item in value: 736 if "component" in component_item.keys(): 737 part.add( 738 component_item["component"]) 739 740 return subsystem, part 741 742 def __teardown__(self, device): 743 if hasattr(device, ConfigConst.support_component): 744 setattr(device, ConfigConst.support_component, None) 745 self._white_list_file = "" 746 self._white_list = "" 747 self._cap_file = "" 748 self.cache_subsystem.clear() 749 self.cache_part.clear() 750 self.cache_device.clear() 751 752 753def remount(device): 754 cmd = "shell mount -o rw,remount /" \ 755 if device.usb_type == DeviceConnectorType.hdc else "remount" 756 device.hdc_command(cmd) 757 758 759def keep_screen_on(device): 760 device.execute_shell_command("svc power stayon true") 761 762 763def get_install_args(device, app_name, original_args=None): 764 """To obtain all the args of app install 765 Args: 766 original_args: the argus configure in .config file 767 device : the device will be installed app 768 app_name : the name of the app which will be installed 769 Returns: 770 All the args 771 """ 772 if original_args is None: 773 original_args = [] 774 new_args = original_args[:] 775 try: 776 sdk_version = device.get_property("ro.build.version.sdk") 777 if int(sdk_version) > 22: 778 new_args.append("-g") 779 except TypeError as type_error: 780 LOG.error("Obtain the sdk version failed with exception {}".format( 781 type_error)) 782 except ValueError as value_error: 783 LOG.error("Obtain the sdk version failed with exception {}".format( 784 value_error)) 785 if app_name.endswith(".apex"): 786 new_args.append("--apex") 787 return " ".join(new_args) 788 789 790def run_command(device, command): 791 LOG.debug("The command:{} is running".format(command)) 792 stdout = None 793 if command.strip() == "remount": 794 remount(device) 795 elif command.strip() == "reboot": 796 device.reboot() 797 elif command.strip() == "reboot-delay": 798 pass 799 else: 800 stdout = device.hdc_command("shell {}".format(command)) 801 LOG.debug("Run command result: %s" % (stdout if stdout else "")) 802 return stdout 803 804 805def junit_para_parse(device, junit_paras, prefix_char="-e"): 806 """To parse the para of junit 807 Args: 808 device: the device running 809 junit_paras: the para dict of junit 810 prefix_char: the prefix char of parsed cmd 811 Returns: 812 the new para using in a command like -e testFile xxx 813 -e coverage true... 814 """ 815 ret_str = [] 816 path = "/%s/%s/%s/%s" % ("data", "local", "tmp", "ajur") 817 include_file = "%s/%s" % (path, "includes.txt") 818 exclude_file = "%s/%s" % (path, "excludes.txt") 819 820 if not isinstance(junit_paras, dict): 821 LOG.warning("The para of junit is not the dict format as required") 822 return "" 823 # Disable screen keyguard 824 disable_key_guard = junit_paras.get('disable-keyguard') 825 if not disable_key_guard or disable_key_guard[0].lower() != 'false': 826 from xdevice_extension._core.driver.drivers import disable_keyguard 827 disable_keyguard(device) 828 829 for para_name in junit_paras.keys(): 830 path = "/%s/%s/%s/%s/" % ("data", "local", "tmp", "ajur") 831 if para_name.strip() == 'test-file-include-filter': 832 for file_name in junit_paras[para_name]: 833 device.push_file(file_name, include_file) 834 device.execute_shell_command( 835 'chown -R shell:shell %s' % path) 836 ret_str.append(" ".join([prefix_char, 'testFile', include_file])) 837 elif para_name.strip() == "test-file-exclude-filter": 838 for file_name in junit_paras[para_name]: 839 device.push_file(file_name, include_file) 840 device.execute_shell_command( 841 'chown -R shell:shell %s' % path) 842 ret_str.append(" ".join([prefix_char, 'notTestFile', 843 exclude_file])) 844 elif para_name.strip() == "test" or para_name.strip() == "class": 845 result = _get_class(junit_paras, prefix_char, para_name.strip()) 846 ret_str.append(result) 847 elif para_name.strip() == "include-annotation": 848 ret_str.append(" ".join([prefix_char, "annotation", 849 ",".join(junit_paras[para_name])])) 850 elif para_name.strip() == "exclude-annotation": 851 ret_str.append(" ".join([prefix_char, "notAnnotation", 852 ",".join(junit_paras[para_name])])) 853 else: 854 ret_str.append(" ".join([prefix_char, para_name, 855 ",".join(junit_paras[para_name])])) 856 857 return " ".join(ret_str) 858 859 860def timeout_callback(proc): 861 try: 862 LOG.error("Error: execute command timeout.") 863 LOG.error(proc.pid) 864 if platform.system() != "Windows": 865 os.killpg(proc.pid, signal.SIGKILL) 866 else: 867 subprocess.call( 868 ["C:\\Windows\\System32\\taskkill", "/F", "/T", "/PID", 869 str(proc.pid)], shell=False) 870 except (FileNotFoundError, KeyboardInterrupt, AttributeError) as error: 871 LOG.exception("timeout callback exception: %s" % error, exc_info=False) 872 873 874def _get_class(junit_paras, prefix_char, para_name): 875 if not junit_paras.get(para_name): 876 return "" 877 result = "" 878 if prefix_char == "-e": 879 result = " %s class " % prefix_char 880 elif prefix_char == "--": 881 result = " %sclass " % prefix_char 882 elif prefix_char == "-s": 883 result = " %s class " % prefix_char 884 test_items = [] 885 for test in junit_paras.get(para_name): 886 test_item = test.split("#") 887 if len(test_item) == 1 or len(test_item) == 2: 888 test_item = "%s" % test 889 test_items.append(test_item) 890 elif len(test_item) == 3: 891 test_item = "%s#%s" % (test_item[1], test_item[2]) 892 test_items.append(test_item) 893 else: 894 raise ParamError("The parameter %s %s is error" % ( 895 prefix_char, para_name)) 896 if not result: 897 LOG.debug("there is unsolved prefix char: %s ." % prefix_char) 898 return result + ",".join(test_items) 899 900 901def gtest_para_parse(gtest_paras, runner): 902 """To parse the para of gtest 903 Args: 904 gtest_paras: the para dict of gtest 905 Returns: 906 the new para using in gtest 907 """ 908 ret_str = [] 909 if not isinstance(gtest_paras, dict): 910 LOG.warning("The para of gtest is not the dict format as required") 911 return "" 912 for para in gtest_paras.keys(): 913 if para.strip() == 'test-file-include-filter': 914 case_list = [] 915 files = gtest_paras.get(para) 916 for case_file in files: 917 flags = os.O_RDONLY 918 modes = stat.S_IWUSR | stat.S_IRUSR 919 with os.fdopen(os.open(case_file, flags, modes), 920 "r") as file_desc: 921 case_list.extend(file_desc.read().splitlines()) 922 923 runner.add_instrumentation_arg("gtest_filter", ":".join(case_list)) 924 925 return " ".join(ret_str) 926 927 928def reset_junit_para(junit_para_str, prefix_char="-e", ignore_keys=None): 929 if not ignore_keys and not isinstance(ignore_keys, list): 930 ignore_keys = ["class", "test"] 931 lines = junit_para_str.split(prefix_char) 932 normal_lines = [] 933 for line in lines: 934 line = line.strip() 935 if line: 936 items = line.split() 937 if items[0].strip() in ignore_keys: 938 continue 939 normal_lines.append("{} {}".format(prefix_char, line)) 940 return " ".join(normal_lines) 941 942 943def get_app_name(hap_app): 944 hap_name = os.path.basename(hap_app).replace(".hap", "") 945 app_name = "" 946 with TemporaryDirectory(prefix=hap_name) as temp_dir: 947 zif_file = zipfile.ZipFile(hap_app) 948 zif_file.extractall(path=temp_dir) 949 config_json_file = os.path.join(temp_dir, "config.json") 950 name_list = ["module.json", "config.json"] 951 for f_name in os.listdir(temp_dir): 952 if f_name in name_list: 953 config_json_file = os.path.join(temp_dir, f_name) 954 break 955 if not os.path.exists(config_json_file): 956 LOG.debug("Neither config.json nor module.json in %s.hap" 957 % hap_name) 958 else: 959 flags = os.O_RDONLY 960 modes = stat.S_IWUSR | stat.S_IRUSR 961 with os.fdopen(os.open(config_json_file, flags, modes), 962 "r") as file_desc: 963 attrs = json.loads(file_desc.read()) 964 if "app" in attrs.keys() and \ 965 "bundleName" in attrs.get("app", dict()).keys(): 966 app_name = attrs["app"]["bundleName"] 967 LOG.info("obtain the app name {} from json " 968 "successfully".format(app_name)) 969 else: 970 LOG.debug("'app' or 'bundleName' not " 971 "in %s.hap/config.json" % hap_name) 972 zif_file.close() 973 return app_name 974 975 976def oh_jsunit_para_parse(runner, junit_paras): 977 junit_paras = dict(junit_paras) 978 test_type_list = ["function", "performance", "reliability", "security"] 979 size_list = ["small", "medium", "large"] 980 level_list = ["0", "1", "2", "3"] 981 for para_name in junit_paras.keys(): 982 para_name = para_name.strip() 983 para_values = junit_paras.get(para_name, []) 984 if para_name == "class": 985 runner.add_arg(para_name, ",".join(para_values)) 986 elif para_name == "notClass": 987 runner.add_arg(para_name, ",".join(para_values)) 988 elif para_name == "testType": 989 if para_values[0] not in test_type_list: 990 continue 991 # function/performance/reliability/security 992 runner.add_arg(para_name, para_values[0]) 993 elif para_name == "size": 994 if para_values[0] not in size_list: 995 continue 996 # size small/medium/large 997 runner.add_arg(para_name, para_values[0]) 998 elif para_name == "level": 999 if para_values[0] not in level_list: 1000 continue 1001 # 0/1/2/3/4 1002 runner.add_arg(para_name, para_values[0]) 1003