1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import re 22import shutil 23import time 24import platform 25import zipfile 26import stat 27from dataclasses import dataclass 28 29from xdevice import DeviceTestType 30from xdevice import DeviceLabelType 31from xdevice import ExecuteTerminate 32from xdevice import DeviceError 33from xdevice import ShellHandler 34 35from xdevice import IDriver 36from xdevice import platform_logger 37from xdevice import Plugin 38from xdevice import get_plugin 39from xdevice_extension._core.constants import CommonParserType 40from xdevice_extension._core.environment.dmlib import process_command_ret 41from core.utils import get_decode 42from core.utils import get_fuzzer_path 43from core.config.resource_manager import ResourceManager 44from core.config.config_manager import FuzzerConfigManager 45 46 47__all__ = [ 48 "CppTestDriver", 49 "JSUnitTestDriver", 50 "disable_keyguard", 51 "GTestConst"] 52 53LOG = platform_logger("Drivers") 54DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test") 55 56TIME_OUT = 900 * 1000 57 58 59############################################################################## 60############################################################################## 61 62 63class DisplayOutputReceiver: 64 def __init__(self): 65 self.output = "" 66 self.unfinished_line = "" 67 68 def _process_output(self, output, end_mark="\n"): 69 content = output 70 if self.unfinished_line: 71 content = "".join((self.unfinished_line, content)) 72 self.unfinished_line = "" 73 lines = content.split(end_mark) 74 if content.endswith(end_mark): 75 return lines[:-1] 76 else: 77 self.unfinished_line = lines[-1] 78 return lines[:-1] 79 80 def __read__(self, output): 81 self.output = "%s%s" % (self.output, output) 82 lines = self._process_output(output) 83 for line in lines: 84 line = line.strip() 85 if line: 86 LOG.info(get_decode(line)) 87 88 def __error__(self, message): 89 pass 90 91 def __done__(self, result_code="", message=""): 92 pass 93 94 95@dataclass 96class GTestConst(object): 97 exec_para_filter = "--gtest_filter" 98 exec_para_level = "--gtest_testsize" 99 100 101def get_device_log_file(report_path, serial=None, log_name="device_log"): 102 from xdevice import Variables 103 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 104 os.makedirs(log_path, exist_ok=True) 105 106 serial = serial or time.time_ns() 107 device_file_name = "{}_{}.log".format(log_name, serial) 108 device_log_file = os.path.join(log_path, device_file_name) 109 return device_log_file 110 111 112def get_level_para_string(level_string): 113 level_list = list(set(level_string.split(","))) 114 level_para_string = "" 115 for item in level_list: 116 if not item.isdigit(): 117 continue 118 item = item.strip(" ") 119 level_para_string += ("Level%s," % item) 120 level_para_string = level_para_string.strip(",") 121 return level_para_string 122 123 124def get_result_savepath(testsuit_path, result_rootpath): 125 findkey = os.sep + "tests" + os.sep 126 filedir, _ = os.path.split(testsuit_path) 127 pos = filedir.find(findkey) 128 if -1 != pos: 129 subpath = filedir[pos + len(findkey):] 130 pos1 = subpath.find(os.sep) 131 if -1 != pos1: 132 subpath = subpath[pos1 + len(os.sep):] 133 result_path = os.path.join(result_rootpath, "result", subpath) 134 else: 135 result_path = os.path.join(result_rootpath, "result") 136 else: 137 result_path = os.path.join(result_rootpath, "result") 138 139 if not os.path.exists(result_path): 140 os.makedirs(result_path) 141 142 LOG.info("result_savepath = " + result_path) 143 return result_path 144 145 146# all testsuit common Unavailable test result xml 147def _create_empty_result_file(filepath, filename, error_message): 148 error_message = str(error_message) 149 error_message = error_message.replace("\"", "") 150 error_message = error_message.replace("<", "") 151 error_message = error_message.replace(">", "") 152 error_message = error_message.replace("&", "") 153 if filename.endswith(".hap"): 154 filename = filename.split(".")[0] 155 if not os.path.exists(filepath): 156 with open(filepath, "w", encoding='utf-8') as file_desc: 157 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 158 time.localtime()) 159 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 160 file_desc.write( 161 '<testsuites tests="0" failures="0" ' 162 'disabled="0" errors="0" timestamp="%s" ' 163 'time="0" name="AllTests">\n' % time_stamp) 164 file_desc.write( 165 ' <testsuite name="%s" tests="0" failures="0" ' 166 'disabled="0" errors="0" time="0.0" ' 167 'unavailable="1" message="%s">\n' % 168 (filename, error_message)) 169 file_desc.write(' </testsuite>\n') 170 file_desc.write('</testsuites>\n') 171 return 172 173 174def _unlock_screen(device): 175 device.execute_shell_command("svc power stayon true") 176 time.sleep(1) 177 178 179def _unlock_device(device): 180 device.execute_shell_command("input keyevent 82") 181 time.sleep(1) 182 device.execute_shell_command("wm dismiss-keyguard") 183 time.sleep(1) 184 185 186def _lock_screen(device): 187 device.execute_shell_command("svc power stayon false") 188 time.sleep(1) 189 190 191def disable_keyguard(device): 192 _unlock_screen(device) 193 _unlock_device(device) 194 195 196def _sleep_according_to_result(result): 197 if result: 198 time.sleep(1) 199 200def _create_fuzz_crash_file(filepath, filename): 201 if not os.path.exists(filepath): 202 with open(filepath, "w", encoding='utf-8') as file_desc: 203 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 204 time.localtime()) 205 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 206 file_desc.write( 207 '<testsuites disabled="0" name="AllTests" ' 208 'time="300" timestamp="%s" errors="0" ' 209 'failures="1" tests="1">\n' % time_stamp) 210 file_desc.write( 211 ' <testsuite disabled="0" name="%s" time="300" ' 212 'errors="0" failures="1" tests="1">\n' % filename) 213 file_desc.write( 214 ' <testcase name="%s" time="300" classname="%s" ' 215 'status="run">\n' % (filename, filename)) 216 file_desc.write( 217 ' <failure type="" ' 218 'message="Fuzzer crash. See ERROR in log file">\n') 219 file_desc.write(' </failure>\n') 220 file_desc.write(' </testcase>\n') 221 file_desc.write(' </testsuite>\n') 222 file_desc.write('</testsuites>\n') 223 return 224 225def _create_fuzz_pass_file(filepath, filename): 226 if not os.path.exists(filepath): 227 with open(filepath, "w", encoding='utf-8') as file_desc: 228 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 229 time.localtime()) 230 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 231 file_desc.write( 232 '<testsuites disabled="0" name="AllTests" ' 233 'time="300" timestamp="%s" errors="0" ' 234 'failures="0" tests="1">\n' % time_stamp) 235 file_desc.write( 236 ' <testsuite disabled="0" name="%s" time="300" ' 237 'errors="0" failures="0" tests="1">\n' % filename) 238 file_desc.write( 239 ' <testcase name="%s" time="300" classname="%s" ' 240 'status="run"/>\n' % (filename, filename)) 241 file_desc.write(' </testsuite>\n') 242 file_desc.write('</testsuites>\n') 243 return 244 245def _create_fuzz_result_file(filepath, filename, error_message): 246 error_message = str(error_message) 247 error_message = error_message.replace("\"", "") 248 error_message = error_message.replace("<", "") 249 error_message = error_message.replace(">", "") 250 error_message = error_message.replace("&", "") 251 if "AddressSanitizer" in error_message: 252 LOG.error("FUZZ TEST CRASH") 253 _create_fuzz_crash_file(filepath, filename) 254 elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second', 255 error_message, re.M) is not None: 256 LOG.info("FUZZ TEST PASS") 257 _create_fuzz_pass_file(filepath, filename) 258 else: 259 LOG.error("FUZZ TEST UNAVAILABLE") 260 _create_empty_result_file(filepath, filename, error_message) 261 return 262############################################################################## 263############################################################################## 264 265class ResultManager(object): 266 def __init__(self, testsuit_path, config): 267 self.testsuite_path = testsuit_path 268 self.config = config 269 self.result_rootpath = self.config.report_path 270 self.device = self.config.device 271 if testsuit_path.endswith(".hap"): 272 self.device_testpath = self.config.test_hap_out_path 273 else: 274 self.device_testpath = self.config.target_test_path 275 self.testsuite_name = os.path.basename(self.testsuite_path) 276 self.is_coverage = False 277 278 def set_is_coverage(self, is_coverage): 279 self.is_coverage = is_coverage 280 281 def get_test_results(self, error_message=""): 282 # Get test result files 283 filepath = self.obtain_test_result_file() 284 if "fuzztest" == self.config.testtype[0]: 285 LOG.info("create fuzz test report") 286 _create_fuzz_result_file(filepath, self.testsuite_name, 287 error_message) 288 return filepath 289 if not os.path.exists(filepath): 290 _create_empty_result_file(filepath, self.testsuite_name, 291 error_message) 292 if "benchmark" == self.config.testtype[0]: 293 self._obtain_benchmark_result() 294 # Get coverage data files 295 if self.is_coverage: 296 self.obtain_coverage_data() 297 298 return filepath 299 300 def _obtain_benchmark_result(self): 301 benchmark_root_dir = os.path.abspath( 302 os.path.join(self.result_rootpath, "benchmark")) 303 benchmark_dir = os.path.abspath( 304 os.path.join(benchmark_root_dir, 305 self.get_result_sub_save_path(), 306 self.testsuite_name)) 307 308 if not os.path.exists(benchmark_dir): 309 os.makedirs(benchmark_dir) 310 311 LOG.info("benchmark_dir = %s" % benchmark_dir) 312 self.device.pull_file(os.path.join(self.device_testpath, 313 "%s.json" % self.testsuite_name), benchmark_dir) 314 if not os.path.exists(os.path.join(benchmark_dir, 315 "%s.json" % self.testsuite_name)): 316 os.rmdir(benchmark_dir) 317 return benchmark_dir 318 319 def get_result_sub_save_path(self): 320 find_key = os.sep + "benchmark" + os.sep 321 file_dir, _ = os.path.split(self.testsuite_path) 322 pos = file_dir.find(find_key) 323 subpath = "" 324 if -1 != pos: 325 subpath = file_dir[pos + len(find_key):] 326 LOG.info("subpath = " + subpath) 327 return subpath 328 329 def obtain_test_result_file(self): 330 result_save_path = get_result_savepath(self.testsuite_path, 331 self.result_rootpath) 332 result_file_path = os.path.join(result_save_path, 333 "%s.xml" % self.testsuite_name) 334 335 result_josn_file_path = os.path.join(result_save_path, 336 "%s.json" % self.testsuite_name) 337 338 if self.testsuite_path.endswith('.hap'): 339 remote_result_file = os.path.join(self.device_testpath, 340 "testcase_result.xml") 341 remote_json_result_file = os.path.join(self.device_testpath, 342 "%s.json" % self.testsuite_name) 343 else: 344 remote_result_file = os.path.join(self.device_testpath, 345 "%s.xml" % self.testsuite_name) 346 remote_json_result_file = os.path.join(self.device_testpath, 347 "%s.json" % self.testsuite_name) 348 349 if self.device.is_file_exist(remote_result_file): 350 self.device.pull_file(remote_result_file, result_file_path) 351 elif self.device.is_file_exist(remote_json_result_file): 352 self.device.pull_file(remote_json_result_file, 353 result_josn_file_path) 354 result_file_path = result_josn_file_path 355 else: 356 LOG.error("%s not exist", remote_result_file) 357 358 return result_file_path 359 360 def make_empty_result_file(self, error_message=""): 361 result_savepath = get_result_savepath(self.testsuite_path, 362 self.result_rootpath) 363 result_filepath = os.path.join(result_savepath, "%s.xml" % 364 self.testsuite_name) 365 if not os.path.exists(result_filepath): 366 _create_empty_result_file(result_filepath, 367 self.testsuite_name, error_message) 368 369 def is_exist_target_in_device(self, path, target): 370 if platform.system() == "Windows": 371 command = '\"ls -l %s | grep %s\"' % (path, target) 372 else: 373 command = "ls -l %s | grep %s" % (path, target) 374 375 check_result = False 376 stdout_info = self.device.execute_shell_command(command) 377 if stdout_info != "" and stdout_info.find(target) != -1: 378 check_result = True 379 return check_result 380 381 def obtain_coverage_data(self): 382 cov_root_dir = os.path.abspath(os.path.join( 383 self.result_rootpath, 384 "..", 385 "coverage", 386 "data", 387 "exec")) 388 389 target_name = "obj" 390 cxx_cov_path = os.path.abspath(os.path.join( 391 self.result_rootpath, 392 "..", 393 "coverage", 394 "data", 395 "cxx", 396 self.testsuite_name)) 397 398 if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name): 399 if not os.path.exists(cxx_cov_path): 400 os.makedirs(cxx_cov_path) 401 src_file = os.path.join(DEFAULT_TEST_PATH, target_name) 402 self.device.pull_file(src_file, cxx_cov_path, is_create=True) 403 404 405############################################################################## 406############################################################################## 407 408@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test) 409class CppTestDriver(IDriver): 410 """ 411 CppTest is a Test that runs a native test package on given device. 412 """ 413 # test driver config 414 config = None 415 result = "" 416 417 def __check_environment__(self, device_options): 418 if len(device_options) == 1 and device_options[0].label is None: 419 return True 420 if len(device_options) != 1 or \ 421 device_options[0].label != DeviceLabelType.phone: 422 return False 423 return True 424 425 def __check_config__(self, config): 426 pass 427 428 def __result__(self): 429 return self.result if os.path.exists(self.result) else "" 430 431 def __execute__(self, request): 432 try: 433 self.config = request.config 434 self.config.target_test_path = DEFAULT_TEST_PATH 435 self.config.device = request.config.environment.devices[0] 436 437 suite_file = request.root.source.source_file 438 LOG.debug("Testsuite FilePath: %s" % suite_file) 439 440 if not suite_file: 441 LOG.error("test source '%s' not exists" % 442 request.root.source.source_string) 443 return 444 445 if not self.config.device: 446 result = ResultManager(suite_file, self.config) 447 result.set_is_coverage(False) 448 result.make_empty_result_file( 449 "No test device is found. ") 450 return 451 452 serial = request.config.device.__get_serial__() 453 device_log_file = get_device_log_file( 454 request.config.report_path, 455 serial) 456 457 with open(device_log_file, "a", encoding="UTF-8") as file_pipe: 458 self.config.device.start_catch_device_log(file_pipe) 459 self._init_gtest() 460 self._run_gtest(suite_file) 461 finally: 462 self.config.device.stop_catch_device_log() 463 464 def _init_gtest(self): 465 self.config.device.hdc_command("target mount") 466 self.config.device.execute_shell_command( 467 "rm -rf %s" % self.config.target_test_path) 468 self.config.device.execute_shell_command( 469 "mkdir -p %s" % self.config.target_test_path) 470 self.config.device.execute_shell_command( 471 "mount -o rw,remount,rw /") 472 if "fuzztest" == self.config.testtype[0]: 473 self.config.device.execute_shell_command( 474 "mkdir -p %s" % os.path.join(self.config.target_test_path, 475 "corpus")) 476 477 def _run_gtest(self, suite_file): 478 from xdevice import Variables 479 filename = os.path.basename(suite_file) 480 test_para = self._get_test_para(self.config.testcase, 481 self.config.testlevel, 482 self.config.testtype, 483 self.config.target_test_path, 484 filename) 485 is_coverage_test = True if self.config.coverage else False 486 487 # push testsuite file 488 self.config.device.push_file(suite_file, self.config.target_test_path) 489 self._push_corpus_if_exist(filename) 490 491 # push resource files 492 resource_manager = ResourceManager() 493 resource_data_dic, resource_dir = \ 494 resource_manager.get_resource_data_dic(suite_file) 495 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 496 self.config.device) 497 498 # execute testcase 499 if not self.config.coverage: 500 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % ( 501 self.config.target_test_path, 502 filename, 503 filename, 504 test_para) 505 else: 506 coverage_outpath = self.config.coverage_outpath 507 strip_num = len(coverage_outpath.split(os.sep)) - 1 508 command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \ 509 "GCOV_PREFIX_STRIP=%s ./%s %s" % \ 510 (self.config.target_test_path, 511 filename, 512 str(strip_num), 513 filename, 514 test_para) 515 516 result = ResultManager(suite_file, self.config) 517 result.set_is_coverage(is_coverage_test) 518 519 try: 520 # get result 521 display_receiver = DisplayOutputReceiver() 522 self.config.device.execute_shell_command( 523 command, 524 receiver=display_receiver, 525 timeout=TIME_OUT, 526 retry=0) 527 return_message = display_receiver.output 528 except (ExecuteTerminate, DeviceError) as exception: 529 return_message = str(exception.args) 530 531 self.result = result.get_test_results(return_message) 532 resource_manager.process_cleaner_data(resource_data_dic, 533 resource_dir, 534 self.config.device) 535 536 def _push_corpus_if_exist(self, filename): 537 if "fuzztest" == self.config.testtype[0]: 538 corpus_path = os.path.join(get_fuzzer_path(filename), "corpus") 539 self.config.device.push_file(corpus_path, 540 os.path.join(self.config.target_test_path, "corpus")) 541 542 @staticmethod 543 def _get_test_para(testcase, 544 testlevel, 545 testtype, 546 target_test_path, 547 filename): 548 if "benchmark" == testtype[0]: 549 test_para = (" --benchmark_out_format=json" 550 " --benchmark_out=%s%s.json") % ( 551 target_test_path, filename) 552 return test_para 553 554 if "" != testcase and "" == testlevel: 555 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 556 elif "" == testcase and "" != testlevel: 557 level_para = get_level_para_string(testlevel) 558 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 559 else: 560 test_para = "" 561 562 if "fuzztest" == testtype[0]: 563 cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path( 564 filename), "project.xml")).get_fuzzer_config("fuzztest") 565 LOG.info("config list :%s" % str(cfg_list)) 566 test_para += "corpus -max_len=" + cfg_list[0] + \ 567 " -max_total_time=" + cfg_list[1] + \ 568 " -rss_limit_mb=" + cfg_list[2] 569 return test_para 570 571 572############################################################################## 573############################################################################## 574 575@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test) 576class JSUnitTestDriver(IDriver): 577 """ 578 JSUnitTestDriver is a Test that runs a native test package on given device. 579 """ 580 581 def __init__(self): 582 self.config = None 583 self.result = "" 584 self.start_time = None 585 self.ability_name = "" 586 self.package_name = "" 587 588 def __check_environment__(self, device_options): 589 pass 590 591 def __check_config__(self, config): 592 pass 593 594 def __result__(self): 595 return self.result if os.path.exists(self.result) else "" 596 597 def __execute__(self, request): 598 try: 599 LOG.info("developertest driver") 600 self.result = os.path.join( 601 request.config.report_path, "result", 602 '.'.join((request.get_module_name(), "xml"))) 603 self.config = request.config 604 self.config.target_test_path = DEFAULT_TEST_PATH 605 self.config.device = request.config.environment.devices[0] 606 607 suite_file = request.root.source.source_file 608 if not suite_file: 609 LOG.error("test source '%s' not exists" % 610 request.root.source.source_string) 611 return 612 613 if not self.config.device: 614 result = ResultManager(suite_file, self.config) 615 result.set_is_coverage(False) 616 result.make_empty_result_file( 617 "No test device is found") 618 return 619 620 package_name, ability_name = self._get_package_and_ability_name( 621 suite_file) 622 self.package_name = package_name 623 self.ability_name = ability_name 624 self.config.test_hap_out_path = \ 625 "/data/data/%s/files/" % self.package_name 626 self.config.device.hdc_command("shell hilog -r") 627 628 hilog = get_device_log_file( 629 request.config.report_path, 630 request.config.device.__get_serial__() + "_" + request. 631 get_module_name(), 632 "device_hilog") 633 634 hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 635 0o755) 636 637 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 638 self.config.device.start_catch_device_log(hilog_file_pipe) 639 self._init_jsunit_test() 640 self._run_jsunit(suite_file) 641 hilog_file_pipe.flush() 642 self.generate_console_output(hilog, request) 643 finally: 644 self.config.device.stop_catch_device_log() 645 646 def _init_jsunit_test(self): 647 self.config.device.hdc_command("target mount") 648 self.config.device.execute_shell_command( 649 "rm -rf %s" % self.config.target_test_path) 650 self.config.device.execute_shell_command( 651 "mkdir -p %s" % self.config.target_test_path) 652 self.config.device.execute_shell_command( 653 "mount -o rw,remount,rw /") 654 655 656 def _run_jsunit(self, suite_file): 657 filename = os.path.basename(suite_file) 658 659 resource_manager = ResourceManager() 660 resource_data_dic, resource_dir = \ 661 resource_manager.get_resource_data_dic(suite_file) 662 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 663 self.config.device) 664 665 main_result = self._install_hap(suite_file) 666 result = ResultManager(suite_file, self.config) 667 668 if main_result: 669 self._execute_hapfile_jsunittest() 670 self._uninstall_hap(self.package_name) 671 else: 672 self.result = result.get_test_results("Error: install hap failed") 673 LOG.error("Error: install hap failed") 674 675 resource_manager.process_cleaner_data(resource_data_dic, resource_dir, 676 self.config.device) 677 678 def generate_console_output(self, device_log_file, request): 679 result_message = self.read_device_log(device_log_file) 680 681 report_name = request.get_module_name() 682 parsers = get_plugin( 683 Plugin.PARSER, CommonParserType.jsunit) 684 if parsers: 685 parsers = parsers[:1] 686 for listener in request.listeners: 687 listener.device_sn = self.config.device.device_sn 688 parser_instances = [] 689 690 for parser in parsers: 691 parser_instance = parser.__class__() 692 parser_instance.suites_name = report_name 693 parser_instance.suite_name = report_name 694 parser_instance.listeners = request.listeners 695 parser_instances.append(parser_instance) 696 handler = ShellHandler(parser_instances) 697 process_command_ret(result_message, handler) 698 699 def read_device_log(self, device_log_file): 700 device_log_file_open = os.open(device_log_file, os.O_RDONLY, 701 stat.S_IWUSR | stat.S_IRUSR) 702 703 result_message = "" 704 with os.fdopen(device_log_file_open, "r", encoding='utf-8') \ 705 as file_read_pipe: 706 while True: 707 data = file_read_pipe.readline() 708 if not data or not data.strip(): 709 break 710 # only filter JSApp log 711 if data.find("JSApp:") != -1: 712 result_message += data 713 if data.find("[end] run suites end") != -1: 714 break 715 return result_message 716 717 def _execute_hapfile_jsunittest(self): 718 _unlock_screen(self.config.device) 719 _unlock_device(self.config.device) 720 721 try: 722 return_message = self.start_hap_activity() 723 except (ExecuteTerminate, DeviceError) as exception: 724 return_message = str(exception.args) 725 726 _lock_screen(self.config.device) 727 return return_message 728 729 def _install_hap(self, suite_file): 730 message = self.config.device.hdc_command("install %s" % suite_file) 731 message = str(message).rstrip() 732 if message == "" or "success" in message: 733 return_code = True 734 if message != "": 735 LOG.info(message) 736 else: 737 return_code = False 738 if message != "": 739 LOG.warning(message) 740 741 _sleep_according_to_result(return_code) 742 return return_code 743 744 def start_hap_activity(self): 745 try: 746 command = "aa start -d 123 -a %s.MainAbility -b %s" \ 747 % (self.package_name, self.package_name) 748 self.start_time = time.time() 749 result_value = self.config.device.execute_shell_command( 750 command, timeout=TIME_OUT) 751 752 if "success" in str(result_value).lower(): 753 LOG.info("execute %s's testcase success. result value=%s" 754 % (self.package_name, result_value)) 755 time.sleep(30) 756 else: 757 LOG.info("execute %s's testcase failed. result value=%s" 758 % (self.package_name, result_value)) 759 760 _sleep_according_to_result(result_value) 761 return_message = result_value 762 except (ExecuteTerminate, DeviceError) as exception: 763 return_message = exception.args 764 765 return return_message 766 767 def _uninstall_hap(self, package_name): 768 return_message = self.config.device.execute_shell_command( 769 "bm uninstall -n %s" % package_name) 770 _sleep_according_to_result(return_message) 771 return return_message 772 773 @staticmethod 774 def _get_package_and_ability_name(hap_filepath): 775 package_name = "" 776 ability_name = "" 777 778 if os.path.exists(hap_filepath): 779 filename = os.path.basename(hap_filepath) 780 781 #unzip the hap file 782 hap_bak_path = os.path.abspath(os.path.join( 783 os.path.dirname(hap_filepath), 784 "%s.bak" % filename)) 785 zf_desc = zipfile.ZipFile(hap_filepath) 786 try: 787 zf_desc.extractall(path=hap_bak_path) 788 except RuntimeError as error: 789 print(error) 790 zf_desc.close() 791 792 #verify config.json file 793 app_profile_path = os.path.join(hap_bak_path, "config.json") 794 if not os.path.exists(app_profile_path): 795 print("file %s not exist" % app_profile_path) 796 return package_name, ability_name 797 798 if os.path.isdir(app_profile_path): 799 print("%s is a folder, and not a file" % app_profile_path) 800 return package_name, ability_name 801 802 #get package_name and ability_name value 803 load_dict = {} 804 with open(app_profile_path, 'r') as load_f: 805 load_dict = json.load(load_f) 806 profile_list = load_dict.values() 807 for profile in profile_list: 808 package_name = profile.get("package") 809 if not package_name: 810 continue 811 812 abilities = profile.get("abilities") 813 for abilitie in abilities: 814 abilities_name = abilitie.get("name") 815 if abilities_name.startswith("."): 816 ability_name = package_name + abilities_name[ 817 abilities_name.find("."):] 818 else: 819 ability_name = abilities_name 820 break 821 break 822 823 #delete hap_bak_path 824 if os.path.exists(hap_bak_path): 825 shutil.rmtree(hap_bak_path) 826 else: 827 print("file %s not exist" % hap_filepath) 828 829 return package_name, ability_name 830