1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import re 22import shutil 23import subprocess 24import time 25import platform 26import zipfile 27import stat 28from dataclasses import dataclass 29from json import JSONDecodeError 30 31from xdevice import DeviceTestType 32from xdevice import DeviceLabelType 33from xdevice import CommonParserType 34from xdevice import ExecuteTerminate 35from xdevice import DeviceError 36from xdevice import ShellHandler 37 38from xdevice import IDriver 39from xdevice import platform_logger 40from xdevice import Plugin 41from xdevice import get_plugin 42from ohos.environment.dmlib import process_command_ret 43from core.utils import get_decode 44from core.utils import get_fuzzer_path 45from core.config.resource_manager import ResourceManager 46from core.config.config_manager import FuzzerConfigManager 47 48 49__all__ = [ 50 "CppTestDriver", 51 "JSUnitTestDriver", 52 "disable_keyguard", 53 "GTestConst"] 54 55LOG = platform_logger("Drivers") 56DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test") 57 58TIME_OUT = 900 * 1000 59JS_TIMEOUT = 10 60CYCLE_TIMES = 30 61 62 63############################################################################## 64############################################################################## 65 66 67class DisplayOutputReceiver: 68 def __init__(self): 69 self.output = "" 70 self.unfinished_line = "" 71 72 def _process_output(self, output, end_mark="\n"): 73 content = output 74 if self.unfinished_line: 75 content = "".join((self.unfinished_line, content)) 76 self.unfinished_line = "" 77 lines = content.split(end_mark) 78 if content.endswith(end_mark): 79 return lines[:-1] 80 else: 81 self.unfinished_line = lines[-1] 82 return lines[:-1] 83 84 def __read__(self, output): 85 self.output = "%s%s" % (self.output, output) 86 lines = self._process_output(output) 87 for line in lines: 88 line = line.strip() 89 if line: 90 LOG.info(get_decode(line)) 91 92 def __error__(self, message): 93 pass 94 95 def __done__(self, result_code="", message=""): 96 pass 97 98 99@dataclass 100class GTestConst(object): 101 exec_para_filter = "--gtest_filter" 102 exec_para_level = "--gtest_testsize" 103 exec_acts_para_filter = "--jstest_filter" 104 exec_acts_para_level = "--jstest_testsize" 105 106 107def get_device_log_file(report_path, serial=None, log_name="device_log"): 108 from xdevice import Variables 109 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 110 os.makedirs(log_path, exist_ok=True) 111 112 serial = serial or time.time_ns() 113 device_file_name = "{}_{}.log".format(log_name, serial) 114 device_log_file = os.path.join(log_path, device_file_name) 115 return device_log_file 116 117 118def get_level_para_string(level_string): 119 level_list = list(set(level_string.split(","))) 120 level_para_string = "" 121 for item in level_list: 122 if not item.isdigit(): 123 continue 124 item = item.strip(" ") 125 level_para_string = f"{level_para_string}Level{item}," 126 level_para_string = level_para_string.strip(",") 127 return level_para_string 128 129 130def get_result_savepath(testsuit_path, result_rootpath): 131 findkey = os.sep + "tests" + os.sep 132 filedir, _ = os.path.split(testsuit_path) 133 pos = filedir.find(findkey) 134 if -1 != pos: 135 subpath = filedir[pos + len(findkey):] 136 pos1 = subpath.find(os.sep) 137 if -1 != pos1: 138 subpath = subpath[pos1 + len(os.sep):] 139 result_path = os.path.join(result_rootpath, "result", subpath) 140 else: 141 result_path = os.path.join(result_rootpath, "result") 142 else: 143 result_path = os.path.join(result_rootpath, "result") 144 145 if not os.path.exists(result_path): 146 os.makedirs(result_path) 147 148 LOG.info("result_savepath = " + result_path) 149 return result_path 150 151 152# all testsuit common Unavailable test result xml 153def _create_empty_result_file(filepath, filename, error_message): 154 error_message = str(error_message) 155 error_message = error_message.replace("\"", "") 156 error_message = error_message.replace("<", "") 157 error_message = error_message.replace(">", "") 158 error_message = error_message.replace("&", "") 159 if filename.endswith(".hap"): 160 filename = filename.split(".")[0] 161 if not os.path.exists(filepath): 162 with open(filepath, "w", encoding='utf-8') as file_desc: 163 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 164 time.localtime()) 165 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 166 file_desc.write( 167 '<testsuites tests="0" failures="0" ' 168 'disabled="0" errors="0" timestamp="%s" ' 169 'time="0" name="AllTests">\n' % time_stamp) 170 file_desc.write( 171 ' <testsuite name="%s" tests="0" failures="0" ' 172 'disabled="0" errors="0" time="0.0" ' 173 'unavailable="1" message="%s">\n' % 174 (filename, error_message)) 175 file_desc.write(' </testsuite>\n') 176 file_desc.write('</testsuites>\n') 177 return 178 179 180def _unlock_screen(device): 181 device.execute_shell_command("svc power stayon true") 182 time.sleep(1) 183 184 185def _unlock_device(device): 186 device.execute_shell_command("input keyevent 82") 187 time.sleep(1) 188 device.execute_shell_command("wm dismiss-keyguard") 189 time.sleep(1) 190 191 192def _lock_screen(device): 193 device.execute_shell_command("svc power stayon false") 194 time.sleep(1) 195 196 197def disable_keyguard(device): 198 _unlock_screen(device) 199 _unlock_device(device) 200 201 202def _sleep_according_to_result(result): 203 if result: 204 time.sleep(1) 205 206def _create_fuzz_crash_file(filepath, filename): 207 if not os.path.exists(filepath): 208 with open(filepath, "w", encoding='utf-8') as file_desc: 209 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 210 time.localtime()) 211 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 212 file_desc.write( 213 '<testsuites disabled="0" name="AllTests" ' 214 'time="300" timestamp="%s" errors="0" ' 215 'failures="1" tests="1">\n' % time_stamp) 216 file_desc.write( 217 ' <testsuite disabled="0" name="%s" time="300" ' 218 'errors="0" failures="1" tests="1">\n' % filename) 219 file_desc.write( 220 ' <testcase name="%s" time="300" classname="%s" ' 221 'status="run">\n' % (filename, filename)) 222 file_desc.write( 223 ' <failure type="" ' 224 'message="Fuzzer crash. See ERROR in log file">\n') 225 file_desc.write(' </failure>\n') 226 file_desc.write(' </testcase>\n') 227 file_desc.write(' </testsuite>\n') 228 file_desc.write('</testsuites>\n') 229 return 230 231def _create_fuzz_pass_file(filepath, filename): 232 if not os.path.exists(filepath): 233 with open(filepath, "w", encoding='utf-8') as file_desc: 234 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 235 time.localtime()) 236 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 237 file_desc.write( 238 '<testsuites disabled="0" name="AllTests" ' 239 'time="300" timestamp="%s" errors="0" ' 240 'failures="0" tests="1">\n' % time_stamp) 241 file_desc.write( 242 ' <testsuite disabled="0" name="%s" time="300" ' 243 'errors="0" failures="0" tests="1">\n' % filename) 244 file_desc.write( 245 ' <testcase name="%s" time="300" classname="%s" ' 246 'status="run"/>\n' % (filename, filename)) 247 file_desc.write(' </testsuite>\n') 248 file_desc.write('</testsuites>\n') 249 return 250 251def _create_fuzz_result_file(filepath, filename, error_message): 252 error_message = str(error_message) 253 error_message = error_message.replace("\"", "") 254 error_message = error_message.replace("<", "") 255 error_message = error_message.replace(">", "") 256 error_message = error_message.replace("&", "") 257 if "AddressSanitizer" in error_message: 258 LOG.error("FUZZ TEST CRASH") 259 _create_fuzz_crash_file(filepath, filename) 260 elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second', 261 error_message, re.M) is not None: 262 LOG.info("FUZZ TEST PASS") 263 _create_fuzz_pass_file(filepath, filename) 264 else: 265 LOG.error("FUZZ TEST UNAVAILABLE") 266 _create_empty_result_file(filepath, filename, error_message) 267 return 268############################################################################## 269############################################################################## 270 271class ResultManager(object): 272 def __init__(self, testsuit_path, config): 273 self.testsuite_path = testsuit_path 274 self.config = config 275 self.result_rootpath = self.config.report_path 276 self.device = self.config.device 277 if testsuit_path.endswith(".hap"): 278 self.device_testpath = self.config.test_hap_out_path 279 else: 280 self.device_testpath = self.config.target_test_path 281 self.testsuite_name = os.path.basename(self.testsuite_path) 282 self.is_coverage = False 283 284 def set_is_coverage(self, is_coverage): 285 self.is_coverage = is_coverage 286 287 def get_test_results(self, error_message=""): 288 # Get test result files 289 filepath = self.obtain_test_result_file() 290 if "fuzztest" == self.config.testtype[0]: 291 LOG.info("create fuzz test report") 292 _create_fuzz_result_file(filepath, self.testsuite_name, 293 error_message) 294 return filepath 295 if not os.path.exists(filepath): 296 _create_empty_result_file(filepath, self.testsuite_name, 297 error_message) 298 if "benchmark" == self.config.testtype[0]: 299 self._obtain_benchmark_result() 300 # Get coverage data files 301 if self.is_coverage: 302 self.obtain_coverage_data() 303 304 return filepath 305 306 def _obtain_benchmark_result(self): 307 benchmark_root_dir = os.path.abspath( 308 os.path.join(self.result_rootpath, "benchmark")) 309 benchmark_dir = os.path.abspath( 310 os.path.join(benchmark_root_dir, 311 self.get_result_sub_save_path(), 312 self.testsuite_name)) 313 314 if not os.path.exists(benchmark_dir): 315 os.makedirs(benchmark_dir) 316 317 LOG.info("benchmark_dir = %s" % benchmark_dir) 318 self.device.pull_file(os.path.join(self.device_testpath, 319 "%s.json" % self.testsuite_name), benchmark_dir) 320 if not os.path.exists(os.path.join(benchmark_dir, 321 "%s.json" % self.testsuite_name)): 322 os.rmdir(benchmark_dir) 323 return benchmark_dir 324 325 def get_result_sub_save_path(self): 326 find_key = os.sep + "benchmark" + os.sep 327 file_dir, _ = os.path.split(self.testsuite_path) 328 pos = file_dir.find(find_key) 329 subpath = "" 330 if -1 != pos: 331 subpath = file_dir[pos + len(find_key):] 332 LOG.info("subpath = " + subpath) 333 return subpath 334 335 def obtain_test_result_file(self): 336 result_save_path = get_result_savepath(self.testsuite_path, 337 self.result_rootpath) 338 result_file_path = os.path.join(result_save_path, 339 "%s.xml" % self.testsuite_name) 340 341 result_josn_file_path = os.path.join(result_save_path, 342 "%s.json" % self.testsuite_name) 343 344 if self.testsuite_path.endswith('.hap'): 345 remote_result_file = os.path.join(self.device_testpath, 346 "testcase_result.xml") 347 remote_json_result_file = os.path.join(self.device_testpath, 348 "%s.json" % self.testsuite_name) 349 else: 350 remote_result_file = os.path.join(self.device_testpath, 351 "%s.xml" % self.testsuite_name) 352 remote_json_result_file = os.path.join(self.device_testpath, 353 "%s.json" % self.testsuite_name) 354 355 if self.config.testtype[0] != "fuzztest": 356 if self.device.is_file_exist(remote_result_file): 357 self.device.pull_file(remote_result_file, result_file_path) 358 elif self.device.is_file_exist(remote_json_result_file): 359 self.device.pull_file(remote_json_result_file, 360 result_josn_file_path) 361 result_file_path = result_josn_file_path 362 else: 363 LOG.info("%s not exist", remote_result_file) 364 365 return result_file_path 366 367 def make_empty_result_file(self, error_message=""): 368 result_savepath = get_result_savepath(self.testsuite_path, 369 self.result_rootpath) 370 result_filepath = os.path.join(result_savepath, "%s.xml" % 371 self.testsuite_name) 372 if not os.path.exists(result_filepath): 373 _create_empty_result_file(result_filepath, 374 self.testsuite_name, error_message) 375 376 def is_exist_target_in_device(self, path, target): 377 if platform.system() == "Windows": 378 command = '\"ls -l %s | grep %s\"' % (path, target) 379 else: 380 command = "ls -l %s | grep %s" % (path, target) 381 382 check_result = False 383 stdout_info = self.device.execute_shell_command(command) 384 if stdout_info != "" and stdout_info.find(target) != -1: 385 check_result = True 386 return check_result 387 388 def obtain_coverage_data(self): 389 cov_root_dir = os.path.abspath(os.path.join( 390 self.result_rootpath, 391 "..", 392 "coverage", 393 "data", 394 "exec")) 395 396 target_name = "obj" 397 cxx_cov_path = os.path.abspath(os.path.join( 398 self.result_rootpath, 399 "..", 400 "coverage", 401 "data", 402 "cxx", 403 self.testsuite_name + '_' + self.config.testtype[0])) 404 405 if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name): 406 if not os.path.exists(cxx_cov_path): 407 os.makedirs(cxx_cov_path) 408 self.config.device.execute_shell_command( 409 "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name)) 410 src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name) 411 self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT) 412 tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name) 413 if platform.system() == "Windows": 414 process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True) 415 process.communicate() 416 os.remove(tar_path) 417 else: 418 subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" % 419 (tar_path, cxx_cov_path), shell=True) 420 subprocess.Popen("rm -rf %s" % tar_path, shell=True) 421 422 423############################################################################## 424############################################################################## 425 426@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test) 427class CppTestDriver(IDriver): 428 """ 429 CppTest is a Test that runs a native test package on given device. 430 """ 431 # test driver config 432 config = None 433 result = "" 434 435 def __check_environment__(self, device_options): 436 if len(device_options) == 1 and device_options[0].label is None: 437 return True 438 if len(device_options) != 1 or \ 439 device_options[0].label != DeviceLabelType.phone: 440 return False 441 return True 442 443 def __check_config__(self, config): 444 pass 445 446 def __result__(self): 447 return self.result if os.path.exists(self.result) else "" 448 449 def __execute__(self, request): 450 try: 451 self.config = request.config 452 self.config.target_test_path = DEFAULT_TEST_PATH 453 self.config.device = request.config.environment.devices[0] 454 455 suite_file = request.root.source.source_file 456 LOG.debug("Testsuite FilePath: %s" % suite_file) 457 458 if not suite_file: 459 LOG.error("test source '%s' not exists" % 460 request.root.source.source_string) 461 return 462 463 if not self.config.device: 464 result = ResultManager(suite_file, self.config) 465 result.set_is_coverage(False) 466 result.make_empty_result_file( 467 "No test device is found. ") 468 return 469 470 self.config.device.set_device_report_path(request.config.report_path) 471 self.config.device.device_log_collector.start_hilog_task() 472 self._init_gtest() 473 self._run_gtest(suite_file) 474 475 finally: 476 serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns()) 477 log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace( 478 ":", "_")) 479 self.config.device.device_log_collector.stop_hilog_task(log_tar_file_name) 480 481 def _init_gtest(self): 482 self.config.device.connector_command("target mount") 483 self.config.device.execute_shell_command( 484 "rm -rf %s" % self.config.target_test_path) 485 self.config.device.execute_shell_command( 486 "mkdir -p %s" % self.config.target_test_path) 487 self.config.device.execute_shell_command( 488 "mount -o rw,remount,rw /") 489 if "fuzztest" == self.config.testtype[0]: 490 self.config.device.execute_shell_command( 491 "mkdir -p %s" % os.path.join(self.config.target_test_path, 492 "corpus")) 493 494 def _run_gtest(self, suite_file): 495 from xdevice import Variables 496 filename = os.path.basename(suite_file) 497 test_para = self._get_test_para(self.config.testcase, 498 self.config.testlevel, 499 self.config.testtype, 500 self.config.target_test_path, 501 suite_file, 502 filename) 503 is_coverage_test = True if self.config.coverage else False 504 505 # push testsuite file 506 self.config.device.push_file(suite_file, self.config.target_test_path) 507 self._push_corpus_if_exist(suite_file) 508 509 # push resource files 510 resource_manager = ResourceManager() 511 resource_data_dic, resource_dir = \ 512 resource_manager.get_resource_data_dic(suite_file) 513 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 514 self.config.device) 515 516 # execute testcase 517 if not self.config.coverage: 518 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % ( 519 self.config.target_test_path, 520 filename, 521 filename, 522 test_para) 523 else: 524 coverage_outpath = self.config.coverage_outpath 525 strip_num = len(coverage_outpath.split("/")) - 1 526 command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \ 527 "GCOV_PREFIX_STRIP=%s ./%s %s" % \ 528 (self.config.target_test_path, 529 filename, 530 str(strip_num), 531 filename, 532 test_para) 533 534 result = ResultManager(suite_file, self.config) 535 result.set_is_coverage(is_coverage_test) 536 537 try: 538 # get result 539 display_receiver = DisplayOutputReceiver() 540 self.config.device.execute_shell_command( 541 command, 542 receiver=display_receiver, 543 timeout=TIME_OUT, 544 retry=0) 545 return_message = display_receiver.output 546 except (ExecuteTerminate, DeviceError) as exception: 547 return_message = str(exception.args) 548 549 self.result = result.get_test_results(return_message) 550 resource_manager.process_cleaner_data(resource_data_dic, 551 resource_dir, 552 self.config.device) 553 554 def _push_corpus_if_exist(self, suite_file): 555 if "fuzztest" == self.config.testtype[0]: 556 corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus") 557 if not os.path.isdir(corpus_path): 558 return 559 560 corpus_dirs = [] 561 corpus_file_list = [] 562 563 for root, _, files in os.walk(corpus_path): 564 if not files: 565 continue 566 567 corpus_dir = root.split("corpus")[-1] 568 if corpus_dir != "": 569 corpus_dirs.append(corpus_dir) 570 571 for file in files: 572 corpus_file_list.append(os.path.normcase( 573 os.path.join(root, file))) 574 575 # mkdir corpus files dir 576 if corpus_dirs: 577 for corpus in corpus_dirs: 578 mkdir_corpus_command = f"shell; mkdir -p {corpus}" 579 self.config.device.connector_command(mkdir_corpus_command) 580 581 # push corpus file 582 if corpus_file_list: 583 for corpus_file in corpus_file_list: 584 self.config.device.push_file(corpus_file, 585 os.path.join(self.config.target_test_path, "corpus")) 586 587 @staticmethod 588 def _get_test_para(testcase, 589 testlevel, 590 testtype, 591 target_test_path, 592 suite_file, 593 filename): 594 if "benchmark" == testtype[0]: 595 test_para = (" --benchmark_out_format=json" 596 " --benchmark_out=%s%s.json") % ( 597 target_test_path, filename) 598 return test_para 599 600 if "" != testcase and "" == testlevel: 601 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 602 elif "" == testcase and "" != testlevel: 603 level_para = get_level_para_string(testlevel) 604 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 605 else: 606 test_para = "" 607 608 if "fuzztest" == testtype[0]: 609 cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path( 610 suite_file), "project.xml")).get_fuzzer_config("fuzztest") 611 LOG.info("config list :%s" % str(cfg_list)) 612 test_para += "corpus -max_len=" + cfg_list[0] + \ 613 " -max_total_time=" + cfg_list[1] + \ 614 " -rss_limit_mb=" + cfg_list[2] 615 return test_para 616 617 618############################################################################## 619############################################################################## 620 621@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test) 622class JSUnitTestDriver(IDriver): 623 """ 624 JSUnitTestDriver is a Test that runs a native test package on given device. 625 """ 626 627 def __init__(self): 628 self.config = None 629 self.result = "" 630 self.start_time = None 631 self.ability_name = "" 632 self.package_name = "" 633 self.hilog = None 634 self.hilog_proc = None 635 636 def __check_environment__(self, device_options): 637 pass 638 639 def __check_config__(self, config): 640 pass 641 642 def __result__(self): 643 return self.result if os.path.exists(self.result) else "" 644 645 def __execute__(self, request): 646 try: 647 LOG.info("developertest driver") 648 self.result = os.path.join( 649 request.config.report_path, "result", 650 '.'.join((request.get_module_name(), "xml"))) 651 self.config = request.config 652 self.config.target_test_path = DEFAULT_TEST_PATH 653 self.config.device = request.config.environment.devices[0] 654 655 suite_file = request.root.source.source_file 656 if not suite_file: 657 LOG.error("test source '%s' not exists" % 658 request.root.source.source_string) 659 return 660 661 if not self.config.device: 662 result = ResultManager(suite_file, self.config) 663 result.set_is_coverage(False) 664 result.make_empty_result_file( 665 "No test device is found") 666 return 667 668 package_name, ability_name = self._get_package_and_ability_name( 669 suite_file) 670 self.package_name = package_name 671 self.ability_name = ability_name 672 self.config.test_hap_out_path = \ 673 "/data/data/%s/files/" % self.package_name 674 self.config.device.connector_command("shell hilog -r") 675 676 self.hilog = get_device_log_file( 677 request.config.report_path, 678 request.config.device.__get_serial__() + "_" + request. 679 get_module_name(), 680 "device_hilog") 681 682 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 683 0o755) 684 685 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 686 self.config.device.device_log_collector.add_log_address(None, self.hilog) 687 _, self.hilog_proc = self.config.device.device_log_collector. \ 688 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 689 self._init_jsunit_test() 690 self._run_jsunit(suite_file, self.hilog) 691 hilog_file_pipe.flush() 692 self.generate_console_output(self.hilog, request) 693 finally: 694 self.config.device.device_log_collector.remove_log_address(None, self.hilog) 695 self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) 696 697 def _init_jsunit_test(self): 698 self.config.device.connector_command("target mount") 699 self.config.device.execute_shell_command( 700 "rm -rf %s" % self.config.target_test_path) 701 self.config.device.execute_shell_command( 702 "mkdir -p %s" % self.config.target_test_path) 703 self.config.device.execute_shell_command( 704 "mount -o rw,remount,rw /") 705 706 def _run_jsunit(self, suite_file, device_log_file): 707 filename = os.path.basename(suite_file) 708 _, suffix_name = os.path.splitext(filename) 709 710 resource_manager = ResourceManager() 711 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 712 if suffix_name == ".hap": 713 json_file_path = suite_file.replace(".hap", ".json") 714 if os.path.exists(json_file_path): 715 timeout = self._get_json_shell_timeout(json_file_path) 716 else: 717 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 718 else: 719 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 720 resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device) 721 main_result = self._install_hap(suite_file) 722 result = ResultManager(suite_file, self.config) 723 if main_result: 724 self._execute_hapfile_jsunittest() 725 try: 726 status = False 727 actiontime = JS_TIMEOUT 728 times = CYCLE_TIMES 729 if timeout: 730 actiontime = timeout 731 times = 1 732 device_log_file_open = os.open(device_log_file, os.O_RDONLY, stat.S_IWUSR | stat.S_IRUSR) 733 with os.fdopen(device_log_file_open, "r", encoding='utf-8') \ 734 as file_read_pipe: 735 for i in range(0, times): 736 if status: 737 break 738 else: 739 time.sleep(float(actiontime)) 740 start_time = int(time.time()) 741 while True: 742 data = file_read_pipe.readline() 743 if data.find("JSApp:") != -1 and data.find("[end] run suites end") != -1: 744 LOG.info("execute testcase successfully.") 745 status = True 746 break 747 if int(time.time()) - start_time > 5: 748 break 749 finally: 750 _lock_screen(self.config.device) 751 self._uninstall_hap(self.package_name) 752 else: 753 self.result = result.get_test_results("Error: install hap failed") 754 LOG.error("Error: install hap failed") 755 756 resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device) 757 758 def generate_console_output(self, device_log_file, request): 759 result_message = self.read_device_log(device_log_file) 760 761 report_name = request.get_module_name() 762 parsers = get_plugin( 763 Plugin.PARSER, CommonParserType.jsunit) 764 if parsers: 765 parsers = parsers[:1] 766 for listener in request.listeners: 767 listener.device_sn = self.config.device.device_sn 768 parser_instances = [] 769 770 for parser in parsers: 771 parser_instance = parser.__class__() 772 parser_instance.suites_name = report_name 773 parser_instance.suite_name = report_name 774 parser_instance.listeners = request.listeners 775 parser_instances.append(parser_instance) 776 handler = ShellHandler(parser_instances) 777 process_command_ret(result_message, handler) 778 779 def read_device_log(self, device_log_file): 780 device_log_file_open = os.open(device_log_file, os.O_RDONLY, 781 stat.S_IWUSR | stat.S_IRUSR) 782 783 result_message = "" 784 with os.fdopen(device_log_file_open, "r", encoding='utf-8') \ 785 as file_read_pipe: 786 while True: 787 data = file_read_pipe.readline() 788 if not data: 789 break 790 # only filter JSApp log 791 if data.find("JSApp:") != -1: 792 result_message += data 793 if data.find("[end] run suites end") != -1: 794 break 795 return result_message 796 797 def _execute_hapfile_jsunittest(self): 798 _unlock_screen(self.config.device) 799 _unlock_device(self.config.device) 800 801 try: 802 return_message = self.start_hap_execute() 803 except (ExecuteTerminate, DeviceError) as exception: 804 return_message = str(exception.args) 805 806 return return_message 807 808 def _install_hap(self, suite_file): 809 message = self.config.device.connector_command("install %s" % suite_file) 810 message = str(message).rstrip() 811 if message == "" or "success" in message: 812 return_code = True 813 if message != "": 814 LOG.info(message) 815 else: 816 return_code = False 817 if message != "": 818 LOG.warning(message) 819 820 _sleep_according_to_result(return_code) 821 return return_code 822 823 def start_hap_execute(self): 824 try: 825 command = "aa start -d 123 -a %s.MainAbility -b %s" \ 826 % (self.package_name, self.package_name) 827 self.start_time = time.time() 828 result_value = self.config.device.execute_shell_command( 829 command, timeout=TIME_OUT) 830 831 if "success" in str(result_value).lower(): 832 LOG.info("execute %s's testcase success. result value=%s" 833 % (self.package_name, result_value)) 834 else: 835 LOG.info("execute %s's testcase failed. result value=%s" 836 % (self.package_name, result_value)) 837 838 _sleep_according_to_result(result_value) 839 return_message = result_value 840 except (ExecuteTerminate, DeviceError) as exception: 841 return_message = exception.args 842 843 return return_message 844 845 def _uninstall_hap(self, package_name): 846 return_message = self.config.device.execute_shell_command( 847 "bm uninstall -n %s" % package_name) 848 _sleep_according_to_result(return_message) 849 return return_message 850 851 @staticmethod 852 def _get_acts_test_para(testcase, 853 testlevel, 854 testtype, 855 target_test_path, 856 suite_file, 857 filename): 858 if "actstest" == testtype[0]: 859 test_para = (" --actstest_out_format=json" 860 " --actstest_out=%s%s.json") % ( 861 target_test_path, filename) 862 return test_para 863 864 if "" != testcase and "" == testlevel: 865 test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase) 866 elif "" == testcase and "" != testlevel: 867 level_para = get_level_para_string(testlevel) 868 test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para) 869 else: 870 test_para = "" 871 return test_para 872 873 @classmethod 874 def _get_json_shell_timeout(cls, json_filepath): 875 test_timeout = 300 876 try: 877 with open(json_filepath, 'r') as json_file: 878 data_dic = json.load(json_file) 879 if not data_dic: 880 return test_timeout 881 else: 882 if "driver" in data_dic.keys(): 883 driver_dict = data_dic.get("driver") 884 if driver_dict and "test-timeout" in driver_dict.keys(): 885 test_timeout = int(driver_dict["shell-timeout"]) / 1000 886 return test_timeout 887 except JSONDecodeError: 888 return test_timeout 889 finally: 890 print(" get json shell timeout finally") 891 892 893 @staticmethod 894 def _get_package_and_ability_name(hap_filepath): 895 package_name = "" 896 ability_name = "" 897 if os.path.exists(hap_filepath): 898 filename = os.path.basename(hap_filepath) 899 900 #unzip the hap file 901 hap_bak_path = os.path.abspath(os.path.join( 902 os.path.dirname(hap_filepath), 903 "%s.bak" % filename)) 904 zf_desc = zipfile.ZipFile(hap_filepath) 905 try: 906 zf_desc.extractall(path=hap_bak_path) 907 except RuntimeError as error: 908 print(error) 909 zf_desc.close() 910 911 #verify config.json file 912 app_profile_path = os.path.join(hap_bak_path, "config.json") 913 if not os.path.exists(app_profile_path): 914 print("file %s not exist" % app_profile_path) 915 return package_name, ability_name 916 917 if os.path.isdir(app_profile_path): 918 print("%s is a folder, and not a file" % app_profile_path) 919 return package_name, ability_name 920 921 #get package_name and ability_name value 922 load_dict = {} 923 with open(app_profile_path, 'r') as load_f: 924 load_dict = json.load(load_f) 925 profile_list = load_dict.values() 926 for profile in profile_list: 927 package_name = profile.get("package") 928 if not package_name: 929 continue 930 abilities = profile.get("abilities") 931 for abilitie in abilities: 932 abilities_name = abilitie.get("name") 933 if abilities_name.startswith("."): 934 ability_name = package_name + abilities_name[ 935 abilities_name.find("."):] 936 else: 937 ability_name = abilities_name 938 break 939 break 940 941 #delete hap_bak_path 942 if os.path.exists(hap_bak_path): 943 shutil.rmtree(hap_bak_path) 944 else: 945 print("file %s not exist" % hap_filepath) 946 return package_name, ability_name 947