1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import re 22import shutil 23import subprocess 24import sys 25import time 26import platform 27import zipfile 28import stat 29import random 30import xml.etree.ElementTree as ET 31from dataclasses import dataclass 32from json import JSONDecodeError 33 34from xdevice import DeviceTestType, check_result_report 35from xdevice import DeviceLabelType 36from xdevice import CommonParserType 37from xdevice import ExecuteTerminate 38from xdevice import DeviceError 39from xdevice import ShellHandler 40 41from xdevice import IDriver 42from xdevice import platform_logger 43from xdevice import Plugin 44from xdevice import get_plugin 45from ohos.environment.dmlib import process_command_ret 46from core.utils import get_decode 47from core.utils import get_fuzzer_path 48from core.config.resource_manager import ResourceManager 49from core.config.config_manager import FuzzerConfigManager 50 51__all__ = [ 52 "CppTestDriver", 53 "JSUnitTestDriver", 54 "disable_keyguard", 55 "GTestConst"] 56 57LOG = platform_logger("Drivers") 58DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test") 59OBJ = "obj" 60_ACE_LOG_MARKER = " a0c0d0" 61TIME_OUT = 900 * 1000 62JS_TIMEOUT = 10 63CYCLE_TIMES = 50 64 65FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL 66MODES = stat.S_IWUSR | stat.S_IRUSR 67 68 69class CollectingOutputReceiver: 70 def __init__(self): 71 self.output = "" 72 73 def __read__(self, output): 74 self.output = "%s%s" % (self.output, output) 75 76 def __error__(self, message): 77 pass 78 79 def __done__(self, result_code="", message=""): 80 pass 81 82 83class DisplayOutputReceiver: 84 def __init__(self): 85 self.output = "" 86 self.unfinished_line = "" 87 88 def __read__(self, output): 89 self.output = "%s%s" % (self.output, output) 90 lines = self._process_output(output) 91 for line in lines: 92 line = line.strip() 93 if line: 94 LOG.info(get_decode(line)) 95 96 def __error__(self, message): 97 pass 98 99 def __done__(self, result_code="", message=""): 100 pass 101 102 def _process_output(self, output, end_mark="\n"): 103 content = output 104 if self.unfinished_line: 105 content = "".join((self.unfinished_line, content)) 106 self.unfinished_line = "" 107 lines = content.split(end_mark) 108 if content.endswith(end_mark): 109 return lines[:-1] 110 else: 111 self.unfinished_line = lines[-1] 112 return lines[:-1] 113 114 115@dataclass 116class GTestConst(object): 117 exec_para_filter = "--gtest_filter" 118 exec_para_level = "--gtest_testsize" 119 exec_acts_para_filter = "--jstest_filter" 120 exec_acts_para_level = "--jstest_testsize" 121 122 123def get_device_log_file(report_path, serial=None, log_name="device_log"): 124 from xdevice import Variables 125 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 126 os.makedirs(log_path, exist_ok=True) 127 128 serial = serial or time.time_ns() 129 device_file_name = "{}_{}.log".format(log_name, serial) 130 device_log_file = os.path.join(log_path, device_file_name) 131 return device_log_file 132 133 134def get_level_para_string(level_string): 135 level_list = list(set(level_string.split(","))) 136 level_para_string = "" 137 for item in level_list: 138 if not item.isdigit(): 139 continue 140 item = item.strip(" ") 141 level_para_string = f"{level_para_string}Level{item}," 142 level_para_string = level_para_string.strip(",") 143 return level_para_string 144 145 146def get_result_savepath(testsuit_path, result_rootpath): 147 findkey = os.sep + "tests" + os.sep 148 filedir, _ = os.path.split(testsuit_path) 149 pos = filedir.find(findkey) 150 if -1 != pos: 151 subpath = filedir[pos + len(findkey):] 152 pos1 = subpath.find(os.sep) 153 if -1 != pos1: 154 subpath = subpath[pos1 + len(os.sep):] 155 result_path = os.path.join(result_rootpath, "result", subpath) 156 else: 157 result_path = os.path.join(result_rootpath, "result") 158 else: 159 result_path = os.path.join(result_rootpath, "result") 160 161 if not os.path.exists(result_path): 162 os.makedirs(result_path) 163 164 LOG.info("result_savepath = " + result_path) 165 return result_path 166 167 168def get_test_log_savepath(result_rootpath, result_suit_path): 169 suit_path = result_suit_path.split("result")[-1].strip(os.sep).split(os.sep)[0] 170 test_log_path = os.path.join(result_rootpath, "log", "test_log", suit_path) 171 172 if not os.path.exists(test_log_path): 173 os.makedirs(test_log_path) 174 175 LOG.info("test_log_savepath = {}".format(test_log_path)) 176 return test_log_path 177 178 179def update_xml(suite_file, result_xml): 180 suite_path_txt = suite_file.split(".")[0] + "_path.txt" 181 if os.path.exists(suite_path_txt) and os.path.exists(result_xml): 182 with open(suite_path_txt, "r") as path_text: 183 line = path_text.readline().replace("//", "").strip() 184 tree = ET.parse(result_xml) 185 tree.getroot().attrib["path"] = line 186 tree.getroot().attrib["name"] = os.path.basename(suite_file.split(".")[0]) 187 188 test_suit = os.path.basename(result_xml).split(".")[0] 189 log_path = os.path.abspath(result_xml).split("result")[0] 190 crash_path = os.path.join(log_path, "log", test_suit) 191 all_items = os.listdir(crash_path) 192 193 # 筛选以crash开头的目录 194 matching_dirs = [os.path.join(crash_path, item) for item in all_items if 195 os.path.isdir(os.path.join(crash_path, item)) 196 and item.startswith(f"crash_log_{test_suit}")] 197 if len(matching_dirs) >= 1: 198 tree.getroot().attrib["is_crash"] = "1" 199 tree.write(result_xml) 200 201 202def remove_color_codes(text): 203 """ 204 color ascii to utf-8 205 Args: 206 text: 207 208 Returns: 209 210 """ 211 return text.encode('unicode_escape').decode('utf-8') 212 213 214# all testsuit common Unavailable test result xml 215def _create_empty_result_file(filepath, filename, error_message): 216 error_message = remove_color_codes(error_message) 217 error_message = error_message.replace("\"", "") 218 error_message = error_message.replace("<", "") 219 error_message = error_message.replace(">", "") 220 error_message = error_message.replace("&", "") 221 if filename.endswith(".hap"): 222 filename = filename.split(".")[0] 223 if not os.path.exists(filepath): 224 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 225 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 226 time.localtime()) 227 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 228 file_desc.write( 229 '<testsuites tests="0" failures="0" ' 230 'disabled="0" errors="0" timestamp="%s" ' 231 'time="0" name="%s" unavailable="1">\n' % (time_stamp, filename)) 232 file_desc.write( 233 ' <testsuite name="%s" tests="0" failures="0" ' 234 'disabled="0" errors="0" time="0.0" ' 235 'unavailable="1" message="%s">\n' % 236 (filename, error_message)) 237 file_desc.write(' </testsuite>\n') 238 file_desc.write('</testsuites>\n') 239 return 240 241 242def _unlock_screen(device): 243 device.execute_shell_command("svc power stayon true") 244 time.sleep(1) 245 246 247def _unlock_device(device): 248 device.execute_shell_command("input keyevent 82") 249 time.sleep(1) 250 device.execute_shell_command("wm dismiss-keyguard") 251 time.sleep(1) 252 253 254def _lock_screen(device): 255 device.execute_shell_command("svc power stayon false") 256 time.sleep(1) 257 258 259def disable_keyguard(device): 260 _unlock_screen(device) 261 _unlock_device(device) 262 263 264def _sleep_according_to_result(result): 265 if result: 266 time.sleep(1) 267 268 269def _create_fuzz_crash_file(filepath, filename): 270 if not os.path.exists(filepath): 271 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 272 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 273 time.localtime()) 274 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 275 file_desc.write( 276 '<testsuites disabled="0" name="%s" ' 277 'time="300" timestamp="%s" errors="0" ' 278 'failures="1" tests="1">\n' % (filename, time_stamp)) 279 file_desc.write( 280 ' <testsuite disabled="0" name="%s" time="300" ' 281 'errors="0" failures="1" tests="1">\n' % filename) 282 file_desc.write( 283 ' <testcase name="%s" time="300" classname="%s" ' 284 'status="run">\n' % (filename, filename)) 285 file_desc.write( 286 ' <failure type="" ' 287 'message="Fuzzer crash. See ERROR in log file">\n') 288 file_desc.write(' </failure>\n') 289 file_desc.write(' </testcase>\n') 290 file_desc.write(' </testsuite>\n') 291 file_desc.write('</testsuites>\n') 292 return 293 294 295def _create_fuzz_pass_file(filepath, filename): 296 if not os.path.exists(filepath): 297 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 298 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 299 time.localtime()) 300 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 301 file_desc.write( 302 '<testsuites disabled="0" name="%s" ' 303 'time="300" timestamp="%s" errors="0" ' 304 'failures="0" tests="1">\n' % (filename, time_stamp)) 305 file_desc.write( 306 ' <testsuite disabled="0" name="%s" time="300" ' 307 'errors="0" failures="0" tests="1">\n' % filename) 308 file_desc.write( 309 ' <testcase name="%s" time="300" classname="%s" ' 310 'status="run"/>\n' % (filename, filename)) 311 file_desc.write(' </testsuite>\n') 312 file_desc.write('</testsuites>\n') 313 return 314 315 316def _create_fuzz_result_file(filepath, filename, error_message): 317 error_message = str(error_message) 318 error_message = error_message.replace("\"", "") 319 error_message = error_message.replace("<", "") 320 error_message = error_message.replace(">", "") 321 error_message = error_message.replace("&", "") 322 if "AddressSanitizer" in error_message: 323 LOG.error("FUZZ TEST CRASH") 324 _create_fuzz_crash_file(filepath, filename) 325 elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second', 326 error_message, re.M) is not None: 327 LOG.info("FUZZ TEST PASS") 328 _create_fuzz_pass_file(filepath, filename) 329 else: 330 LOG.error("FUZZ TEST UNAVAILABLE") 331 _create_empty_result_file(filepath, filename, error_message) 332 return 333 334 335class ResultManager(object): 336 def __init__(self, testsuit_path, config): 337 self.testsuite_path = testsuit_path 338 self.config = config 339 self.result_rootpath = self.config.report_path 340 self.device = self.config.device 341 if testsuit_path.endswith(".hap"): 342 self.device_testpath = self.config.test_hap_out_path 343 else: 344 self.device_testpath = self.config.target_test_path 345 self.testsuite_name = os.path.basename(self.testsuite_path) 346 self.is_coverage = False 347 348 def set_is_coverage(self, is_coverage): 349 self.is_coverage = is_coverage 350 351 def get_test_results(self, error_message=""): 352 # Get test result files 353 filepath, _ = self.obtain_test_result_file() 354 if "fuzztest" == self.config.testtype[0]: 355 LOG.info("create fuzz test report") 356 _create_fuzz_result_file(filepath, self.testsuite_name, 357 error_message) 358 if not self.is_coverage: 359 self._obtain_fuzz_corpus() 360 361 if not os.path.exists(filepath): 362 _create_empty_result_file(filepath, self.testsuite_name, 363 error_message) 364 if "benchmark" == self.config.testtype[0]: 365 self._obtain_benchmark_result() 366 # Get coverage data files 367 if self.is_coverage: 368 self.obtain_coverage_data() 369 370 return filepath 371 372 def get_test_results_hidelog(self, error_message=""): 373 # Get test result files 374 result_file_path, test_log_path = self.obtain_test_result_file() 375 log_content = "" 376 if not error_message: 377 if os.path.exists(test_log_path): 378 with open(test_log_path, "r") as log: 379 log_content = log.readlines() 380 else: 381 LOG.error("{}: Test log not exist.".format(test_log_path)) 382 else: 383 log_content = error_message 384 385 if "fuzztest" == self.config.testtype[0]: 386 LOG.info("create fuzz test report") 387 _create_fuzz_result_file(result_file_path, self.testsuite_name, 388 log_content) 389 if not self.is_coverage: 390 self._obtain_fuzz_corpus() 391 392 if not os.path.exists(result_file_path): 393 _create_empty_result_file(result_file_path, self.testsuite_name, 394 log_content) 395 if "benchmark" == self.config.testtype[0]: 396 self._obtain_benchmark_result() 397 # Get coverage data files 398 if self.is_coverage: 399 self.obtain_coverage_data() 400 401 return result_file_path 402 403 def get_result_sub_save_path(self): 404 find_key = os.sep + "benchmark" + os.sep 405 file_dir, _ = os.path.split(self.testsuite_path) 406 pos = file_dir.find(find_key) 407 subpath = "" 408 if -1 != pos: 409 subpath = file_dir[pos + len(find_key):] 410 LOG.info("subpath = " + subpath) 411 return subpath 412 413 def obtain_test_result_file(self): 414 result_save_path = get_result_savepath(self.testsuite_path, 415 self.result_rootpath) 416 417 result_file_path = os.path.join(result_save_path, 418 "%s.xml" % self.testsuite_name) 419 420 result_josn_file_path = os.path.join(result_save_path, 421 "%s.json" % self.testsuite_name) 422 423 if self.testsuite_path.endswith('.hap'): 424 remote_result_file = os.path.join(self.device_testpath, 425 "testcase_result.xml") 426 remote_json_result_file = os.path.join(self.device_testpath, 427 "%s.json" % self.testsuite_name) 428 else: 429 remote_result_file = os.path.join(self.device_testpath, 430 "%s.xml" % self.testsuite_name) 431 remote_json_result_file = os.path.join(self.device_testpath, 432 "%s.json" % self.testsuite_name) 433 434 if self.config.testtype[0] != "fuzztest": 435 if self.device.is_file_exist(remote_result_file): 436 self.device.pull_file(remote_result_file, result_file_path) 437 elif self.device.is_file_exist(remote_json_result_file): 438 self.device.pull_file(remote_json_result_file, 439 result_josn_file_path) 440 result_file_path = result_josn_file_path 441 else: 442 LOG.info("%s not exist", remote_result_file) 443 444 if self.config.hidelog: 445 remote_log_result_file = os.path.join(self.device_testpath, 446 "%s.log" % self.testsuite_name) 447 test_log_save_path = get_test_log_savepath(self.result_rootpath, result_save_path) 448 test_log_file_path = os.path.join(test_log_save_path, 449 "%s.log" % self.testsuite_name) 450 self.device.pull_file(remote_log_result_file, test_log_file_path) 451 return result_file_path, test_log_file_path 452 453 return result_file_path, "" 454 455 def make_empty_result_file(self, error_message=""): 456 result_savepath = get_result_savepath(self.testsuite_path, 457 self.result_rootpath) 458 result_filepath = os.path.join(result_savepath, "%s.xml" % 459 self.testsuite_name) 460 if not os.path.exists(result_filepath): 461 _create_empty_result_file(result_filepath, 462 self.testsuite_name, error_message) 463 464 def is_exist_target_in_device(self, path, target): 465 if platform.system() == "Windows": 466 command = '\"ls -l %s | grep %s\"' % (path, target) 467 else: 468 command = "ls -l %s | grep %s" % (path, target) 469 470 check_result = False 471 stdout_info = self.device.execute_shell_command(command) 472 if stdout_info != "" and stdout_info.find(target) != -1: 473 check_result = True 474 return check_result 475 476 def obtain_coverage_data(self): 477 cov_root_dir = os.path.abspath(os.path.join( 478 self.result_rootpath, 479 "..", 480 "coverage", 481 "data", 482 "exec")) 483 484 485 tests_path = self.config.testcases_path 486 test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0] 487 cxx_cov_path = os.path.abspath(os.path.join( 488 self.result_rootpath, 489 "..", 490 "coverage", 491 "data", 492 "cxx", 493 self.testsuite_name + '_' + test_type)) 494 495 if os.path.basename(self.testsuite_name).startswith("rust_"): 496 target_name = "lib.unstripped" 497 else: 498 target_name = OBJ 499 if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name): 500 if not os.path.exists(cxx_cov_path): 501 os.makedirs(cxx_cov_path) 502 else: 503 cxx_cov_path = cxx_cov_path + f"_{str(int(time.time()))}" 504 self.config.device.execute_shell_command( 505 "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name)) 506 src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name) 507 self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT) 508 tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name) 509 if platform.system() == "Windows": 510 process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True) 511 process.communicate() 512 os.remove(tar_path) 513 os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ)) 514 else: 515 subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" % 516 (tar_path, cxx_cov_path), shell=True).communicate() 517 subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate() 518 if target_name != OBJ: 519 subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name), 520 os.path.join(cxx_cov_path, OBJ)), shell=True).communicate() 521 522 def _obtain_fuzz_corpus(self): 523 command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;" 524 self.config.device.execute_shell_command(command) 525 result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath) 526 LOG.info(f"fuzz_dir = {result_save_path}") 527 self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path) 528 529 def _obtain_benchmark_result(self): 530 benchmark_root_dir = os.path.abspath( 531 os.path.join(self.result_rootpath, "benchmark")) 532 benchmark_dir = os.path.abspath( 533 os.path.join(benchmark_root_dir, 534 self.get_result_sub_save_path(), 535 self.testsuite_name)) 536 537 if not os.path.exists(benchmark_dir): 538 os.makedirs(benchmark_dir) 539 540 LOG.info("benchmark_dir = %s" % benchmark_dir) 541 self.device.pull_file(os.path.join(self.device_testpath, 542 "%s.json" % self.testsuite_name), benchmark_dir) 543 if not os.path.exists(os.path.join(benchmark_dir, 544 "%s.json" % self.testsuite_name)): 545 os.rmdir(benchmark_dir) 546 return benchmark_dir 547 548 549@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test) 550class CppTestDriver(IDriver): 551 """ 552 CppTest is a Test that runs a native test package on given device. 553 """ 554 # test driver config 555 config = None 556 result = "" 557 558 def __check_environment__(self, device_options): 559 if len(device_options) == 1 and device_options[0].label is None: 560 return True 561 if len(device_options) != 1 or \ 562 device_options[0].label != DeviceLabelType.phone: 563 return False 564 return True 565 566 def __check_config__(self, config): 567 pass 568 569 def __result__(self): 570 return self.result if os.path.exists(self.result) else "" 571 572 def __execute__(self, request): 573 try: 574 self.config = request.config 575 self.config.target_test_path = DEFAULT_TEST_PATH 576 self.config.device = request.config.environment.devices[0] 577 578 suite_file = request.root.source.source_file 579 LOG.debug("Testsuite FilePath: %s" % suite_file) 580 581 if not suite_file: 582 LOG.error("test source '%s' not exists" % 583 request.root.source.source_string) 584 return 585 586 if not self.config.device: 587 result = ResultManager(suite_file, self.config) 588 result.set_is_coverage(False) 589 result.make_empty_result_file( 590 "No test device is found. ") 591 return 592 593 self.config.device.set_device_report_path(request.config.report_path) 594 if request.config.hilogswitch != "0": 595 self.config.device.device_log_collector.start_hilog_task() 596 self._init_gtest() 597 self._run_gtest(suite_file) 598 599 finally: 600 log_path = get_result_savepath(request.root.source.source_file, request.config.report_path) 601 suit_name = os.path.basename(request.root.source.source_file) 602 xml_path = os.path.join(log_path, f"{suit_name}.xml") 603 if not os.path.exists(xml_path): 604 _create_empty_result_file(xml_path, suit_name, "ERROR") 605 serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns()) 606 log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace( 607 ":", "_")) 608 try: 609 if request.config.hilogswitch != "0": 610 self.config.device.device_log_collector.stop_hilog_task( 611 log_tar_file_name, module_name=request.get_module_name()) 612 finally: 613 update_xml(request.root.source.source_file, xml_path) 614 615 @staticmethod 616 def _alter_init(name): 617 with open(name, "rb") as f: 618 lines = f.read() 619 str_content = lines.decode("utf-8") 620 621 pattern_sharp = '^\s*#.*$(\n|\r\n)' 622 pattern_star = '/\*.*?\*/(\n|\r\n)+' 623 pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+' 624 625 if re.match(pattern_sharp, str_content, flags=re.M): 626 striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M) 627 elif re.findall(pattern_star, str_content, flags=re.S): 628 striped_content = re.sub(pattern_star, '', str_content, flags=re.S) 629 elif re.findall(pattern_xml, str_content, flags=re.S): 630 striped_content = re.sub(pattern_xml, '', str_content, flags=re.S) 631 else: 632 striped_content = str_content 633 634 striped_bt = striped_content.encode("utf-8") 635 if os.path.exists(name): 636 os.remove(name) 637 with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f: 638 f.write(striped_bt) 639 640 def _init_gtest(self): 641 self.config.device.connector_command("target mount") 642 self.config.device.execute_shell_command( 643 "rm -rf %s" % self.config.target_test_path) 644 self.config.device.execute_shell_command( 645 "mkdir -p %s" % self.config.target_test_path) 646 self.config.device.execute_shell_command( 647 "mount -o rw,remount,rw /") 648 if "fuzztest" == self.config.testtype[0]: 649 self.config.device.execute_shell_command( 650 "mkdir -p %s" % os.path.join(self.config.target_test_path, 651 "corpus")) 652 653 def _gtest_command(self, suite_file): 654 filename = os.path.basename(suite_file) 655 test_para = self._get_test_para(self.config.testcase, 656 self.config.testlevel, 657 self.config.testtype, 658 self.config.target_test_path, 659 suite_file, 660 filename, 661 self.config.iteration) 662 663 # execute testcase 664 if not self.config.coverage: 665 if self.config.random == "random": 666 seed = random.randint(1, 100) 667 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % ( 668 self.config.target_test_path, 669 filename, 670 filename, 671 test_para, 672 seed) 673 else: 674 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % ( 675 self.config.target_test_path, 676 filename, 677 filename, 678 test_para) 679 else: 680 coverage_outpath = self.config.coverage_outpath 681 if coverage_outpath: 682 strip_num = len(coverage_outpath.strip("/").split("/")) 683 else: 684 ohos_config_path = os.path.join(sys.source_code_root_path, "out", "ohos_config.json") 685 with open(ohos_config_path, 'r') as json_file: 686 json_info = json.load(json_file) 687 out_path = json_info.get("out_path") 688 strip_num = len(out_path.strip("/").split("/")) 689 if "fuzztest" == self.config.testtype[0]: 690 self._push_corpus_cov_if_exist(suite_file) 691 command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \ 692 rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX={DEFAULT_TEST_PATH}; \ 693 GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}" 694 else: 695 command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=%s " \ 696 "GCOV_PREFIX_STRIP=%s ./%s %s" % \ 697 (self.config.target_test_path, 698 filename, 699 DEFAULT_TEST_PATH, 700 str(strip_num), 701 filename, 702 test_para) 703 704 if self.config.hidelog: 705 command += " > {}.log 2>&1".format(filename) 706 707 return command 708 709 def _run_gtest(self, suite_file): 710 from xdevice import Variables 711 is_coverage_test = True if self.config.coverage else False 712 713 # push testsuite file 714 self.config.device.push_file(suite_file, self.config.target_test_path) 715 self.config.device.execute_shell_command( 716 "hilog -d %s" % (os.path.join(self.config.target_test_path, 717 os.path.basename(suite_file))) 718 ) 719 self._push_corpus_if_exist(suite_file) 720 721 # push resource files 722 resource_manager = ResourceManager() 723 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 724 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 725 self.config.device) 726 727 command = self._gtest_command(suite_file) 728 729 result = ResultManager(suite_file, self.config) 730 result.set_is_coverage(is_coverage_test) 731 732 try: 733 # get result 734 if self.config.hidelog: 735 return_message = "" 736 display_receiver = CollectingOutputReceiver() 737 self.config.device.execute_shell_command( 738 command, 739 receiver=display_receiver, 740 timeout=TIME_OUT, 741 retry=0) 742 else: 743 display_receiver = DisplayOutputReceiver() 744 self.config.device.execute_shell_command( 745 command, 746 receiver=display_receiver, 747 timeout=TIME_OUT, 748 retry=0) 749 return_message = display_receiver.output 750 except (ExecuteTerminate, DeviceError) as exception: 751 return_message = str(exception.args) 752 753 if self.config.hidelog: 754 self.result = result.get_test_results_hidelog(return_message) 755 else: 756 self.result = result.get_test_results(return_message) 757 758 resource_manager.process_cleaner_data(resource_data_dic, 759 resource_dir, 760 self.config.device) 761 762 def _push_corpus_cov_if_exist(self, suite_file): 763 corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep) 764 cov_file = os.path.join( 765 sys.framework_root_dir, "reports", "latest_corpus", corpus_path + "_corpus.tar.gz") 766 LOG.info("corpus_cov file :%s" % str(cov_file)) 767 self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path)) 768 769 def _push_corpus_if_exist(self, suite_file): 770 if "fuzztest" == self.config.testtype[0]: 771 corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus") 772 if not os.path.isdir(corpus_path): 773 return 774 775 corpus_dirs = [] 776 corpus_file_list = [] 777 778 for root, _, files in os.walk(corpus_path): 779 if not files: 780 continue 781 782 corpus_dir = root.split("corpus")[-1] 783 if corpus_dir != "": 784 corpus_dirs.append(corpus_dir) 785 786 for file in files: 787 cp_file = os.path.normcase(os.path.join(root, file)) 788 corpus_file_list.append(cp_file) 789 if file == "init": 790 self._alter_init(cp_file) 791 792 # mkdir corpus files dir 793 if corpus_dirs: 794 for corpus in corpus_dirs: 795 mkdir_corpus_command = f"shell; mkdir -p {corpus}" 796 self.config.device.connector_command(mkdir_corpus_command) 797 798 # push corpus file 799 if corpus_file_list: 800 for corpus_file in corpus_file_list: 801 self.config.device.push_file(corpus_file, 802 os.path.join(self.config.target_test_path, "corpus")) 803 804 def _get_test_para(self, 805 testcase, 806 testlevel, 807 testtype, 808 target_test_path, 809 suite_file, 810 filename, 811 iteration): 812 if "benchmark" == testtype[0]: 813 test_para = (" --benchmark_out_format=json" 814 " --benchmark_out=%s%s.json") % ( 815 target_test_path, filename) 816 return test_para 817 818 if "" != testcase and "" == testlevel: 819 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 820 elif "" == testcase and "" != testlevel: 821 level_para = get_level_para_string(testlevel) 822 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 823 else: 824 test_para = "" 825 826 if iteration: 827 test_para += f" --gtest_repeat={iteration}" 828 829 if "fuzztest" == testtype[0]: 830 cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path( 831 suite_file), "project.xml")).get_fuzzer_config("fuzztest") 832 LOG.info("config list :%s" % str(cfg_list)) 833 if self.config.coverage: 834 test_para += "corpus -runs=0" + \ 835 " -max_len=" + cfg_list[0] + \ 836 " -max_total_time=" + cfg_list[1] + \ 837 " -rss_limit_mb=" + cfg_list[2] 838 else: 839 test_para += "corpus -max_len=" + cfg_list[0] + \ 840 " -max_total_time=" + cfg_list[1] + \ 841 " -rss_limit_mb=" + cfg_list[2] 842 843 return test_para 844 845 846@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test) 847class JSUnitTestDriver(IDriver): 848 """ 849 JSUnitTestDriver is a Test that runs a native test package on given device. 850 """ 851 852 def __init__(self): 853 self.config = None 854 self.result = "" 855 self.start_time = None 856 self.ability_name = "" 857 self.package_name = "" 858 # log 859 self.hilog = None 860 self.hilog_proc = None 861 862 def __check_environment__(self, device_options): 863 pass 864 865 def __check_config__(self, config): 866 pass 867 868 def __result__(self): 869 return self.result if os.path.exists(self.result) else "" 870 871 def __execute__(self, request): 872 try: 873 LOG.info("developer_test driver") 874 self.config = request.config 875 self.config.target_test_path = DEFAULT_TEST_PATH 876 self.config.device = request.config.environment.devices[0] 877 878 suite_file = request.root.source.source_file 879 result_save_path = get_result_savepath(suite_file, self.config.report_path) 880 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 881 if not suite_file: 882 LOG.error("test source '%s' not exists" % 883 request.root.source.source_string) 884 return 885 886 if not self.config.device: 887 result = ResultManager(suite_file, self.config) 888 result.set_is_coverage(False) 889 result.make_empty_result_file( 890 "No test device is found") 891 return 892 893 package_name, ability_name = self._get_package_and_ability_name( 894 suite_file) 895 self.package_name = package_name 896 self.ability_name = ability_name 897 self.config.test_hap_out_path = \ 898 "/data/data/%s/files/" % self.package_name 899 self.config.device.connector_command("shell hilog -r") 900 901 self.hilog = get_device_log_file( 902 request.config.report_path, 903 request.config.device.__get_serial__() + "_" + request. 904 get_module_name(), 905 "device_hilog") 906 907 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 908 0o755) 909 910 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 911 self.config.device.device_log_collector.add_log_address(None, self.hilog) 912 _, self.hilog_proc = self.config.device.device_log_collector.\ 913 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 914 self._init_jsunit_test() 915 self._run_jsunit(suite_file, self.hilog) 916 hilog_file_pipe.flush() 917 self.generate_console_output(self.hilog, request) 918 xml_path = os.path.join( 919 request.config.report_path, "result", 920 '.'.join((request.get_module_name(), "xml"))) 921 shutil.move(xml_path, self.result) 922 finally: 923 self.config.device.device_log_collector.remove_log_address(None, self.hilog) 924 self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) 925 update_xml(request.root.source.source_file, self.result) 926 927 @staticmethod 928 def _get_acts_test_para(testcase, 929 testlevel, 930 testtype, 931 target_test_path, 932 suite_file, 933 filename): 934 if "actstest" == testtype[0]: 935 test_para = (" --actstest_out_format=json" 936 " --actstest_out=%s%s.json") % ( 937 target_test_path, filename) 938 return test_para 939 940 if "" != testcase and "" == testlevel: 941 test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase) 942 elif "" == testcase and "" != testlevel: 943 level_para = get_level_para_string(testlevel) 944 test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para) 945 else: 946 test_para = "" 947 return test_para 948 949 @staticmethod 950 def _get_hats_test_para(testcase, 951 testlevel, 952 testtype, 953 target_test_path, 954 suite_file, 955 filename): 956 if "hatstest" == testtype[0]: 957 test_hats_para = (" --hatstest_out_format=json" 958 " --hatstest_out=%s%s.json") % ( 959 target_test_path, filename) 960 return test_hats_para 961 962 if "" != testcase and "" == testlevel: 963 test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 964 elif "" == testcase and "" != testlevel: 965 level_para = get_level_para_string(testlevel) 966 test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 967 else: 968 test_hats_para = "" 969 return test_hats_para 970 971 @classmethod 972 def _get_json_shell_timeout(cls, json_filepath): 973 test_timeout = 0 974 try: 975 with open(json_filepath, 'r') as json_file: 976 data_dic = json.load(json_file) 977 if not data_dic: 978 return test_timeout 979 else: 980 if "driver" in data_dic.keys(): 981 driver_dict = data_dic.get("driver") 982 if driver_dict and "test-timeout" in driver_dict.keys(): 983 test_timeout = int(driver_dict["shell-timeout"]) / 1000 984 return test_timeout 985 except JSONDecodeError: 986 return test_timeout 987 finally: 988 print(" get json shell timeout finally") 989 990 @staticmethod 991 def _get_package_and_ability_name(hap_filepath): 992 package_name = "" 993 ability_name = "" 994 if os.path.exists(hap_filepath): 995 filename = os.path.basename(hap_filepath) 996 997 # unzip the hap file 998 hap_bak_path = os.path.abspath(os.path.join( 999 os.path.dirname(hap_filepath), 1000 "%s.bak" % filename)) 1001 zf_desc = zipfile.ZipFile(hap_filepath) 1002 try: 1003 zf_desc.extractall(path=hap_bak_path) 1004 except RuntimeError as error: 1005 print("Unzip error: ", hap_bak_path) 1006 zf_desc.close() 1007 1008 # verify config.json file 1009 app_profile_path = os.path.join(hap_bak_path, "config.json") 1010 if not os.path.exists(app_profile_path): 1011 print("file %s not exist" % app_profile_path) 1012 return package_name, ability_name 1013 1014 if os.path.isdir(app_profile_path): 1015 print("%s is a folder, and not a file" % app_profile_path) 1016 return package_name, ability_name 1017 1018 # get package_name and ability_name value 1019 load_dict = {} 1020 with open(app_profile_path, 'r') as load_f: 1021 load_dict = json.load(load_f) 1022 profile_list = load_dict.values() 1023 for profile in profile_list: 1024 package_name = profile.get("package") 1025 if not package_name: 1026 continue 1027 abilities = profile.get("abilities") 1028 for abilitie in abilities: 1029 abilities_name = abilitie.get("name") 1030 if abilities_name.startswith("."): 1031 ability_name = package_name + abilities_name[ 1032 abilities_name.find("."):] 1033 else: 1034 ability_name = abilities_name 1035 break 1036 break 1037 1038 # delete hap_bak_path 1039 if os.path.exists(hap_bak_path): 1040 shutil.rmtree(hap_bak_path) 1041 else: 1042 print("file %s not exist" % hap_filepath) 1043 return package_name, ability_name 1044 1045 def generate_console_output(self, device_log_file, request): 1046 result_message = self.read_device_log(device_log_file) 1047 1048 report_name = request.get_module_name() 1049 parsers = get_plugin( 1050 Plugin.PARSER, CommonParserType.jsunit) 1051 if parsers: 1052 parsers = parsers[:1] 1053 for listener in request.listeners: 1054 listener.device_sn = self.config.device.device_sn 1055 parser_instances = [] 1056 1057 for parser in parsers: 1058 parser_instance = parser.__class__() 1059 parser_instance.suites_name = report_name 1060 parser_instance.suite_name = report_name 1061 parser_instance.listeners = request.listeners 1062 parser_instances.append(parser_instance) 1063 handler = ShellHandler(parser_instances) 1064 process_command_ret(result_message, handler) 1065 1066 def read_device_log(self, device_log_file): 1067 result_message = "" 1068 with open(device_log_file, "r", encoding='utf-8', 1069 errors='ignore') as file_read_pipe: 1070 while True: 1071 data = file_read_pipe.readline() 1072 if not data: 1073 break 1074 # only filter JSApp log 1075 if data.lower().find(_ACE_LOG_MARKER) != -1: 1076 result_message += data 1077 if data.find("[end] run suites end") != -1: 1078 break 1079 return result_message 1080 1081 def start_hap_execute(self): 1082 try: 1083 command = "aa start -d 123 -a %s.MainAbility -b %s" \ 1084 % (self.package_name, self.package_name) 1085 self.start_time = time.time() 1086 result_value = self.config.device.execute_shell_command( 1087 command, timeout=TIME_OUT) 1088 1089 if "success" in str(result_value).lower(): 1090 LOG.info("execute %s's testcase success. result value=%s" 1091 % (self.package_name, result_value)) 1092 else: 1093 LOG.info("execute %s's testcase failed. result value=%s" 1094 % (self.package_name, result_value)) 1095 1096 _sleep_according_to_result(result_value) 1097 return_message = result_value 1098 except (ExecuteTerminate, DeviceError) as exception: 1099 return_message = exception.args 1100 1101 return return_message 1102 1103 def _init_jsunit_test(self): 1104 self.config.device.connector_command("target mount") 1105 self.config.device.execute_shell_command( 1106 "rm -rf %s" % self.config.target_test_path) 1107 self.config.device.execute_shell_command( 1108 "mkdir -p %s" % self.config.target_test_path) 1109 self.config.device.execute_shell_command( 1110 "mount -o rw,remount,rw /") 1111 1112 def _run_jsunit(self, suite_file, device_log_file): 1113 filename = os.path.basename(suite_file) 1114 _, suffix_name = os.path.splitext(filename) 1115 1116 resource_manager = ResourceManager() 1117 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 1118 if suffix_name == ".hap": 1119 json_file_path = suite_file.replace(".hap", ".json") 1120 if os.path.exists(json_file_path): 1121 timeout = self._get_json_shell_timeout(json_file_path) 1122 else: 1123 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 1124 else: 1125 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 1126 resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device) 1127 main_result = self._install_hap(suite_file) 1128 result = ResultManager(suite_file, self.config) 1129 if main_result: 1130 self._execute_hapfile_jsunittest() 1131 try: 1132 status = False 1133 actiontime = JS_TIMEOUT 1134 times = CYCLE_TIMES 1135 if timeout: 1136 actiontime = timeout 1137 times = 1 1138 with open(device_log_file, "r", encoding='utf-8', 1139 errors='ignore') as file_read_pipe: 1140 for i in range(0, times): 1141 if status: 1142 break 1143 else: 1144 time.sleep(float(actiontime)) 1145 start_time = int(time.time()) 1146 while True: 1147 data = file_read_pipe.readline() 1148 if data.lower().find(_ACE_LOG_MARKER) != -1 and data.find("[end] run suites end") != -1: 1149 LOG.info("execute testcase successfully.") 1150 status = True 1151 break 1152 if int(time.time()) - start_time > 5: 1153 break 1154 finally: 1155 _lock_screen(self.config.device) 1156 self._uninstall_hap(self.package_name) 1157 else: 1158 self.result = result.get_test_results("Error: install hap failed") 1159 LOG.error("Error: install hap failed") 1160 1161 resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device) 1162 1163 def _execute_hapfile_jsunittest(self): 1164 _unlock_screen(self.config.device) 1165 _unlock_device(self.config.device) 1166 1167 try: 1168 return_message = self.start_hap_execute() 1169 except (ExecuteTerminate, DeviceError) as exception: 1170 return_message = str(exception.args) 1171 1172 return return_message 1173 1174 def _install_hap(self, suite_file): 1175 message = self.config.device.connector_command("install %s" % suite_file) 1176 message = str(message).rstrip() 1177 if message == "" or "success" in message: 1178 return_code = True 1179 if message != "": 1180 LOG.info(message) 1181 else: 1182 return_code = False 1183 if message != "": 1184 LOG.warning(message) 1185 1186 _sleep_according_to_result(return_code) 1187 return return_code 1188 1189 def _uninstall_hap(self, package_name): 1190 return_message = self.config.device.execute_shell_command( 1191 "bm uninstall -n %s" % package_name) 1192 _sleep_according_to_result(return_message) 1193 return return_message 1194 1195 1196@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) 1197class OHRustTestDriver(IDriver): 1198 def __init__(self): 1199 self.result = "" 1200 self.error_message = "" 1201 self.config = None 1202 1203 def __check_environment__(self, device_options): 1204 pass 1205 1206 def __check_config__(self, config): 1207 pass 1208 1209 def __execute__(self, request): 1210 try: 1211 LOG.debug("Start to execute open harmony rust test") 1212 self.config = request.config 1213 self.config.device = request.config.environment.devices[0] 1214 self.config.target_test_path = DEFAULT_TEST_PATH 1215 suite_file = request.root.source.source_file 1216 LOG.debug("Testsuite filepath:{}".format(suite_file)) 1217 1218 if not suite_file: 1219 LOG.error("test source '{}' not exists".format( 1220 request.root.source.source_string)) 1221 return 1222 1223 result_save_path = get_result_savepath(suite_file, self.config.report_path) 1224 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 1225 self.config.device.set_device_report_path(request.config.report_path) 1226 self.config.device.device_log_collector.start_hilog_task() 1227 self._init_oh_rust() 1228 self._run_oh_rust(suite_file, request) 1229 except Exception as exception: 1230 self.error_message = exception 1231 if not getattr(exception, "error_no", ""): 1232 setattr(exception, "error_no", "03409") 1233 LOG.exception(self.error_message, exc_info=False, error_no="03409") 1234 finally: 1235 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 1236 time.time_ns()) 1237 log_tar_file_name = "{}_{}".format( 1238 request.get_module_name(), str(serial).replace(":", "_")) 1239 try: 1240 self.config.device.device_log_collector.stop_hilog_task( 1241 log_tar_file_name, module_name=request.get_module_name()) 1242 finally: 1243 xml_path = os.path.join( 1244 request.config.report_path, "result", 1245 '.'.join((request.get_module_name(), "xml"))) 1246 shutil.move(xml_path, self.result) 1247 update_xml(request.root.source.source_file, self.result) 1248 self.result = check_result_report( 1249 request.config.report_path, self.result, self.error_message) 1250 1251 def __result__(self): 1252 return self.result if os.path.exists(self.result) else "" 1253 1254 def _init_oh_rust(self): 1255 self.config.device.connector_command("target mount") 1256 self.config.device.execute_shell_command( 1257 "rm -rf %s" % self.config.target_test_path) 1258 self.config.device.execute_shell_command( 1259 "mkdir -p %s" % self.config.target_test_path) 1260 self.config.device.execute_shell_command( 1261 "mount -o rw,remount,rw /") 1262 1263 def _run_oh_rust(self, suite_file, request=None): 1264 self.config.device.push_file(suite_file, self.config.target_test_path) 1265 self.config.device.execute_shell_command( 1266 "hilog -d %s" % (os.path.join(self.config.target_test_path, 1267 os.path.basename(suite_file))) 1268 ) 1269 resource_manager = ResourceManager() 1270 resource_data_dict, resource_dir = \ 1271 resource_manager.get_resource_data_dic(suite_file) 1272 resource_manager.process_preparer_data(resource_data_dict, 1273 resource_dir, 1274 self.config.device) 1275 for listener in request.listeners: 1276 listener.device_sn = self.config.device.device_sn 1277 1278 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust) 1279 if parsers: 1280 parsers = parsers[:1] 1281 parser_instances = [] 1282 for parser in parsers: 1283 parser_instance = parser.__class__() 1284 parser_instance.suite_name = request.get_module_name() 1285 parser_instance.listeners = request.listeners 1286 parser_instances.append(parser_instance) 1287 handler = ShellHandler(parser_instances) 1288 if self.config.coverage: 1289 command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format( 1290 self.config.target_test_path, os.path.basename(suite_file)) 1291 else: 1292 command = "cd {}; chmod +x *; ./{}".format( 1293 self.config.target_test_path, os.path.basename(suite_file)) 1294 self.config.device.execute_shell_command( 1295 command, timeout=TIME_OUT, receiver=handler, retry=0) 1296 if self.config.coverage: 1297 result = ResultManager(suite_file, self.config) 1298 result.obtain_coverage_data() 1299 resource_manager.process_cleaner_data(resource_data_dict, resource_dir, 1300 self.config.device) 1301