1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import re 22import shutil 23import subprocess 24import sys 25import time 26import platform 27import zipfile 28import stat 29import random 30import xml.etree.ElementTree as ET 31from dataclasses import dataclass 32from json import JSONDecodeError 33 34from xdevice import DeviceTestType, check_result_report 35from xdevice import DeviceLabelType 36from xdevice import CommonParserType 37from xdevice import ExecuteTerminate 38from xdevice import DeviceError 39from xdevice import ShellHandler 40 41from xdevice import IDriver 42from xdevice import platform_logger 43from xdevice import Plugin 44from xdevice import get_plugin 45from ohos.environment.dmlib import process_command_ret 46from core.utils import get_decode 47from core.utils import get_fuzzer_path 48from core.config.resource_manager import ResourceManager 49from core.config.config_manager import FuzzerConfigManager 50 51__all__ = [ 52 "CppTestDriver", 53 "JSUnitTestDriver", 54 "disable_keyguard", 55 "GTestConst"] 56 57LOG = platform_logger("Drivers") 58DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test") 59OBJ = "obj" 60_ACE_LOG_MARKER = " a0c0d0" 61TIME_OUT = 900 * 1000 62JS_TIMEOUT = 10 63CYCLE_TIMES = 50 64 65FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL 66MODES = stat.S_IWUSR | stat.S_IRUSR 67 68 69class CollectingOutputReceiver: 70 def __init__(self): 71 self.output = "" 72 73 def __read__(self, output): 74 self.output = "%s%s" % (self.output, output) 75 76 def __error__(self, message): 77 pass 78 79 def __done__(self, result_code="", message=""): 80 pass 81 82 83class DisplayOutputReceiver: 84 def __init__(self): 85 self.output = "" 86 self.unfinished_line = "" 87 88 def __read__(self, output): 89 self.output = "%s%s" % (self.output, output) 90 lines = self._process_output(output) 91 for line in lines: 92 line = line.strip() 93 if line: 94 LOG.info(get_decode(line)) 95 96 def __error__(self, message): 97 pass 98 99 def __done__(self, result_code="", message=""): 100 pass 101 102 def _process_output(self, output, end_mark="\n"): 103 content = output 104 if self.unfinished_line: 105 content = "".join((self.unfinished_line, content)) 106 self.unfinished_line = "" 107 lines = content.split(end_mark) 108 if content.endswith(end_mark): 109 return lines[:-1] 110 else: 111 self.unfinished_line = lines[-1] 112 return lines[:-1] 113 114 115@dataclass 116class GTestConst(object): 117 exec_para_filter = "--gtest_filter" 118 exec_para_level = "--gtest_testsize" 119 exec_acts_para_filter = "--jstest_filter" 120 exec_acts_para_level = "--jstest_testsize" 121 122 123def get_device_log_file(report_path, serial=None, log_name="device_log"): 124 from xdevice import Variables 125 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 126 os.makedirs(log_path, exist_ok=True) 127 128 serial = serial or time.time_ns() 129 device_file_name = "{}_{}.log".format(log_name, serial) 130 device_log_file = os.path.join(log_path, device_file_name) 131 return device_log_file 132 133 134def get_level_para_string(level_string): 135 level_list = list(set(level_string.split(","))) 136 level_para_string = "" 137 for item in level_list: 138 if not item.isdigit(): 139 continue 140 item = item.strip(" ") 141 level_para_string = f"{level_para_string}Level{item}," 142 level_para_string = level_para_string.strip(",") 143 return level_para_string 144 145 146def get_result_savepath(testsuit_path, result_rootpath): 147 findkey = os.sep + "tests" + os.sep 148 filedir, _ = os.path.split(testsuit_path) 149 pos = filedir.find(findkey) 150 if -1 != pos: 151 subpath = filedir[pos + len(findkey):] 152 pos1 = subpath.find(os.sep) 153 if -1 != pos1: 154 subpath = subpath[pos1 + len(os.sep):] 155 result_path = os.path.join(result_rootpath, "result", subpath) 156 else: 157 result_path = os.path.join(result_rootpath, "result") 158 else: 159 result_path = os.path.join(result_rootpath, "result") 160 161 if not os.path.exists(result_path): 162 os.makedirs(result_path) 163 164 LOG.info("result_savepath = " + result_path) 165 return result_path 166 167 168def get_test_log_savepath(result_rootpath, result_suit_path): 169 suit_path = result_suit_path.split("result")[-1].strip(os.sep).split(os.sep)[0] 170 test_log_path = os.path.join(result_rootpath, "log", "test_log", suit_path) 171 172 if not os.path.exists(test_log_path): 173 os.makedirs(test_log_path) 174 175 LOG.info("test_log_savepath = {}".format(test_log_path)) 176 return test_log_path 177 178 179def update_xml(suite_file, result_xml): 180 suite_path_txt = suite_file.split(".")[0] + "_path.txt" 181 if os.path.exists(suite_path_txt) and os.path.exists(result_xml): 182 with open(suite_path_txt, "r") as path_text: 183 line = path_text.readline().replace("//", "").strip() 184 tree = ET.parse(result_xml) 185 tree.getroot().attrib["path"] = line 186 tree.getroot().attrib["name"] = os.path.basename(suite_file.split(".")[0]) 187 188 test_suit = os.path.basename(result_xml).split(".")[0] 189 log_path = os.path.abspath(result_xml).split("result")[0] 190 crash_path = os.path.join(log_path, "log", test_suit) 191 all_items = os.listdir(crash_path) 192 193 # 筛选以crash开头的目录 194 matching_dirs = [os.path.join(crash_path, item) for item in all_items if 195 os.path.isdir(os.path.join(crash_path, item)) 196 and item.startswith(f"crash_log_{test_suit}")] 197 if len(matching_dirs) >= 1: 198 tree.getroot().attrib["is_crash"] = "1" 199 tree.write(result_xml) 200 201 202# all testsuit common Unavailable test result xml 203def _create_empty_result_file(filepath, filename, error_message): 204 error_message = str(error_message) 205 error_message = error_message.replace("\"", "") 206 error_message = error_message.replace("<", "") 207 error_message = error_message.replace(">", "") 208 error_message = error_message.replace("&", "") 209 if filename.endswith(".hap"): 210 filename = filename.split(".")[0] 211 if not os.path.exists(filepath): 212 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 213 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 214 time.localtime()) 215 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 216 file_desc.write( 217 '<testsuites tests="0" failures="0" ' 218 'disabled="0" errors="0" timestamp="%s" ' 219 'time="0" name="%s" unavailable="1">\n' % (time_stamp, filename)) 220 file_desc.write( 221 ' <testsuite name="%s" tests="0" failures="0" ' 222 'disabled="0" errors="0" time="0.0" ' 223 'unavailable="1" message="%s">\n' % 224 (filename, error_message)) 225 file_desc.write(' </testsuite>\n') 226 file_desc.write('</testsuites>\n') 227 return 228 229 230def _unlock_screen(device): 231 device.execute_shell_command("svc power stayon true") 232 time.sleep(1) 233 234 235def _unlock_device(device): 236 device.execute_shell_command("input keyevent 82") 237 time.sleep(1) 238 device.execute_shell_command("wm dismiss-keyguard") 239 time.sleep(1) 240 241 242def _lock_screen(device): 243 device.execute_shell_command("svc power stayon false") 244 time.sleep(1) 245 246 247def disable_keyguard(device): 248 _unlock_screen(device) 249 _unlock_device(device) 250 251 252def _sleep_according_to_result(result): 253 if result: 254 time.sleep(1) 255 256 257def _create_fuzz_crash_file(filepath, filename): 258 if not os.path.exists(filepath): 259 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 260 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 261 time.localtime()) 262 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 263 file_desc.write( 264 '<testsuites disabled="0" name="%s" ' 265 'time="300" timestamp="%s" errors="0" ' 266 'failures="1" tests="1">\n' % (filename, time_stamp)) 267 file_desc.write( 268 ' <testsuite disabled="0" name="%s" time="300" ' 269 'errors="0" failures="1" tests="1">\n' % filename) 270 file_desc.write( 271 ' <testcase name="%s" time="300" classname="%s" ' 272 'status="run">\n' % (filename, filename)) 273 file_desc.write( 274 ' <failure type="" ' 275 'message="Fuzzer crash. See ERROR in log file">\n') 276 file_desc.write(' </failure>\n') 277 file_desc.write(' </testcase>\n') 278 file_desc.write(' </testsuite>\n') 279 file_desc.write('</testsuites>\n') 280 return 281 282 283def _create_fuzz_pass_file(filepath, filename): 284 if not os.path.exists(filepath): 285 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 286 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 287 time.localtime()) 288 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 289 file_desc.write( 290 '<testsuites disabled="0" name="%s" ' 291 'time="300" timestamp="%s" errors="0" ' 292 'failures="0" tests="1">\n' % (filename, time_stamp)) 293 file_desc.write( 294 ' <testsuite disabled="0" name="%s" time="300" ' 295 'errors="0" failures="0" tests="1">\n' % filename) 296 file_desc.write( 297 ' <testcase name="%s" time="300" classname="%s" ' 298 'status="run"/>\n' % (filename, filename)) 299 file_desc.write(' </testsuite>\n') 300 file_desc.write('</testsuites>\n') 301 return 302 303 304def _create_fuzz_result_file(filepath, filename, error_message): 305 error_message = str(error_message) 306 error_message = error_message.replace("\"", "") 307 error_message = error_message.replace("<", "") 308 error_message = error_message.replace(">", "") 309 error_message = error_message.replace("&", "") 310 if "AddressSanitizer" in error_message: 311 LOG.error("FUZZ TEST CRASH") 312 _create_fuzz_crash_file(filepath, filename) 313 elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second', 314 error_message, re.M) is not None: 315 LOG.info("FUZZ TEST PASS") 316 _create_fuzz_pass_file(filepath, filename) 317 else: 318 LOG.error("FUZZ TEST UNAVAILABLE") 319 _create_empty_result_file(filepath, filename, error_message) 320 return 321 322 323class ResultManager(object): 324 def __init__(self, testsuit_path, config): 325 self.testsuite_path = testsuit_path 326 self.config = config 327 self.result_rootpath = self.config.report_path 328 self.device = self.config.device 329 if testsuit_path.endswith(".hap"): 330 self.device_testpath = self.config.test_hap_out_path 331 else: 332 self.device_testpath = self.config.target_test_path 333 self.testsuite_name = os.path.basename(self.testsuite_path) 334 self.is_coverage = False 335 336 def set_is_coverage(self, is_coverage): 337 self.is_coverage = is_coverage 338 339 def get_test_results(self, error_message=""): 340 # Get test result files 341 filepath, _ = self.obtain_test_result_file() 342 if "fuzztest" == self.config.testtype[0]: 343 LOG.info("create fuzz test report") 344 _create_fuzz_result_file(filepath, self.testsuite_name, 345 error_message) 346 if not self.is_coverage: 347 self._obtain_fuzz_corpus() 348 349 if not os.path.exists(filepath): 350 _create_empty_result_file(filepath, self.testsuite_name, 351 error_message) 352 if "benchmark" == self.config.testtype[0]: 353 self._obtain_benchmark_result() 354 # Get coverage data files 355 if self.is_coverage: 356 self.obtain_coverage_data() 357 358 return filepath 359 360 def get_test_results_hidelog(self, error_message=""): 361 # Get test result files 362 result_file_path, test_log_path = self.obtain_test_result_file() 363 log_content = "" 364 if not error_message: 365 if os.path.exists(test_log_path): 366 with open(test_log_path, "r") as log: 367 log_content = log.readlines() 368 else: 369 LOG.error("{}: Test log not exist.".format(test_log_path)) 370 else: 371 log_content = error_message 372 373 if "fuzztest" == self.config.testtype[0]: 374 LOG.info("create fuzz test report") 375 _create_fuzz_result_file(result_file_path, self.testsuite_name, 376 log_content) 377 if not self.is_coverage: 378 self._obtain_fuzz_corpus() 379 380 if not os.path.exists(result_file_path): 381 _create_empty_result_file(result_file_path, self.testsuite_name, 382 log_content) 383 if "benchmark" == self.config.testtype[0]: 384 self._obtain_benchmark_result() 385 # Get coverage data files 386 if self.is_coverage: 387 self.obtain_coverage_data() 388 389 return result_file_path 390 391 def get_result_sub_save_path(self): 392 find_key = os.sep + "benchmark" + os.sep 393 file_dir, _ = os.path.split(self.testsuite_path) 394 pos = file_dir.find(find_key) 395 subpath = "" 396 if -1 != pos: 397 subpath = file_dir[pos + len(find_key):] 398 LOG.info("subpath = " + subpath) 399 return subpath 400 401 def obtain_test_result_file(self): 402 result_save_path = get_result_savepath(self.testsuite_path, 403 self.result_rootpath) 404 405 result_file_path = os.path.join(result_save_path, 406 "%s.xml" % self.testsuite_name) 407 408 result_josn_file_path = os.path.join(result_save_path, 409 "%s.json" % self.testsuite_name) 410 411 if self.testsuite_path.endswith('.hap'): 412 remote_result_file = os.path.join(self.device_testpath, 413 "testcase_result.xml") 414 remote_json_result_file = os.path.join(self.device_testpath, 415 "%s.json" % self.testsuite_name) 416 else: 417 remote_result_file = os.path.join(self.device_testpath, 418 "%s.xml" % self.testsuite_name) 419 remote_json_result_file = os.path.join(self.device_testpath, 420 "%s.json" % self.testsuite_name) 421 422 if self.config.testtype[0] != "fuzztest": 423 if self.device.is_file_exist(remote_result_file): 424 self.device.pull_file(remote_result_file, result_file_path) 425 elif self.device.is_file_exist(remote_json_result_file): 426 self.device.pull_file(remote_json_result_file, 427 result_josn_file_path) 428 result_file_path = result_josn_file_path 429 else: 430 LOG.info("%s not exist", remote_result_file) 431 432 if self.config.hidelog: 433 remote_log_result_file = os.path.join(self.device_testpath, 434 "%s.log" % self.testsuite_name) 435 test_log_save_path = get_test_log_savepath(self.result_rootpath, result_save_path) 436 test_log_file_path = os.path.join(test_log_save_path, 437 "%s.log" % self.testsuite_name) 438 self.device.pull_file(remote_log_result_file, test_log_file_path) 439 return result_file_path, test_log_file_path 440 441 return result_file_path, "" 442 443 def make_empty_result_file(self, error_message=""): 444 result_savepath = get_result_savepath(self.testsuite_path, 445 self.result_rootpath) 446 result_filepath = os.path.join(result_savepath, "%s.xml" % 447 self.testsuite_name) 448 if not os.path.exists(result_filepath): 449 _create_empty_result_file(result_filepath, 450 self.testsuite_name, error_message) 451 452 def is_exist_target_in_device(self, path, target): 453 if platform.system() == "Windows": 454 command = '\"ls -l %s | grep %s\"' % (path, target) 455 else: 456 command = "ls -l %s | grep %s" % (path, target) 457 458 check_result = False 459 stdout_info = self.device.execute_shell_command(command) 460 if stdout_info != "" and stdout_info.find(target) != -1: 461 check_result = True 462 return check_result 463 464 def obtain_coverage_data(self): 465 cov_root_dir = os.path.abspath(os.path.join( 466 self.result_rootpath, 467 "..", 468 "coverage", 469 "data", 470 "exec")) 471 472 473 tests_path = self.config.testcases_path 474 test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0] 475 cxx_cov_path = os.path.abspath(os.path.join( 476 self.result_rootpath, 477 "..", 478 "coverage", 479 "data", 480 "cxx", 481 self.testsuite_name + '_' + test_type)) 482 483 if os.path.basename(self.testsuite_name).startswith("rust_"): 484 target_name = "lib.unstripped" 485 else: 486 target_name = OBJ 487 if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name): 488 if not os.path.exists(cxx_cov_path): 489 os.makedirs(cxx_cov_path) 490 else: 491 cxx_cov_path = cxx_cov_path + f"_{str(int(time.time()))}" 492 self.config.device.execute_shell_command( 493 "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name)) 494 src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name) 495 self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT) 496 tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name) 497 if platform.system() == "Windows": 498 process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True) 499 process.communicate() 500 os.remove(tar_path) 501 os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ)) 502 else: 503 subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" % 504 (tar_path, cxx_cov_path), shell=True).communicate() 505 subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate() 506 if target_name != OBJ: 507 subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name), 508 os.path.join(cxx_cov_path, OBJ)), shell=True).communicate() 509 510 def _obtain_fuzz_corpus(self): 511 command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;" 512 self.config.device.execute_shell_command(command) 513 result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath) 514 LOG.info(f"fuzz_dir = {result_save_path}") 515 self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path) 516 517 def _obtain_benchmark_result(self): 518 benchmark_root_dir = os.path.abspath( 519 os.path.join(self.result_rootpath, "benchmark")) 520 benchmark_dir = os.path.abspath( 521 os.path.join(benchmark_root_dir, 522 self.get_result_sub_save_path(), 523 self.testsuite_name)) 524 525 if not os.path.exists(benchmark_dir): 526 os.makedirs(benchmark_dir) 527 528 LOG.info("benchmark_dir = %s" % benchmark_dir) 529 self.device.pull_file(os.path.join(self.device_testpath, 530 "%s.json" % self.testsuite_name), benchmark_dir) 531 if not os.path.exists(os.path.join(benchmark_dir, 532 "%s.json" % self.testsuite_name)): 533 os.rmdir(benchmark_dir) 534 return benchmark_dir 535 536 537@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test) 538class CppTestDriver(IDriver): 539 """ 540 CppTest is a Test that runs a native test package on given device. 541 """ 542 # test driver config 543 config = None 544 result = "" 545 546 def __check_environment__(self, device_options): 547 if len(device_options) == 1 and device_options[0].label is None: 548 return True 549 if len(device_options) != 1 or \ 550 device_options[0].label != DeviceLabelType.phone: 551 return False 552 return True 553 554 def __check_config__(self, config): 555 pass 556 557 def __result__(self): 558 return self.result if os.path.exists(self.result) else "" 559 560 def __execute__(self, request): 561 try: 562 self.config = request.config 563 self.config.target_test_path = DEFAULT_TEST_PATH 564 self.config.device = request.config.environment.devices[0] 565 566 suite_file = request.root.source.source_file 567 LOG.debug("Testsuite FilePath: %s" % suite_file) 568 569 if not suite_file: 570 LOG.error("test source '%s' not exists" % 571 request.root.source.source_string) 572 return 573 574 if not self.config.device: 575 result = ResultManager(suite_file, self.config) 576 result.set_is_coverage(False) 577 result.make_empty_result_file( 578 "No test device is found. ") 579 return 580 self.config.device.set_device_report_path(request.config.report_path) 581 self.config.device.device_log_collector.start_hilog_task() 582 self._init_gtest() 583 self._run_gtest(suite_file) 584 585 finally: 586 log_path = get_result_savepath(request.root.source.source_file, request.config.report_path) 587 suit_name = os.path.basename(request.root.source.source_file) 588 xml_path = os.path.join(log_path, f"{suit_name}.xml") 589 if not os.path.exists(xml_path): 590 _create_empty_result_file(xml_path, suit_name, "ERROR") 591 serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns()) 592 log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace( 593 ":", "_")) 594 try: 595 self.config.device.device_log_collector.stop_hilog_task( 596 log_tar_file_name, module_name=request.get_module_name()) 597 finally: 598 update_xml(request.root.source.source_file, xml_path) 599 600 @staticmethod 601 def _alter_init(name): 602 with open(name, "rb") as f: 603 lines = f.read() 604 str_content = lines.decode("utf-8") 605 606 pattern_sharp = '^\s*#.*$(\n|\r\n)' 607 pattern_star = '/\*.*?\*/(\n|\r\n)+' 608 pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+' 609 610 if re.match(pattern_sharp, str_content, flags=re.M): 611 striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M) 612 elif re.findall(pattern_star, str_content, flags=re.S): 613 striped_content = re.sub(pattern_star, '', str_content, flags=re.S) 614 elif re.findall(pattern_xml, str_content, flags=re.S): 615 striped_content = re.sub(pattern_xml, '', str_content, flags=re.S) 616 else: 617 striped_content = str_content 618 619 striped_bt = striped_content.encode("utf-8") 620 if os.path.exists(name): 621 os.remove(name) 622 with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f: 623 f.write(striped_bt) 624 625 def _init_gtest(self): 626 self.config.device.connector_command("target mount") 627 self.config.device.execute_shell_command( 628 "rm -rf %s" % self.config.target_test_path) 629 self.config.device.execute_shell_command( 630 "mkdir -p %s" % self.config.target_test_path) 631 self.config.device.execute_shell_command( 632 "mount -o rw,remount,rw /") 633 if "fuzztest" == self.config.testtype[0]: 634 self.config.device.execute_shell_command( 635 "mkdir -p %s" % os.path.join(self.config.target_test_path, 636 "corpus")) 637 638 def _gtest_command(self, suite_file): 639 filename = os.path.basename(suite_file) 640 test_para = self._get_test_para(self.config.testcase, 641 self.config.testlevel, 642 self.config.testtype, 643 self.config.target_test_path, 644 suite_file, 645 filename) 646 647 # execute testcase 648 if not self.config.coverage: 649 if self.config.random == "random": 650 seed = random.randint(1, 100) 651 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % ( 652 self.config.target_test_path, 653 filename, 654 filename, 655 test_para, 656 seed) 657 else: 658 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % ( 659 self.config.target_test_path, 660 filename, 661 filename, 662 test_para) 663 else: 664 coverage_outpath = self.config.coverage_outpath 665 if coverage_outpath: 666 strip_num = len(coverage_outpath.strip("/").split("/")) 667 else: 668 ohos_config_path = os.path.join(sys.source_code_root_path, "out", "ohos_config.json") 669 with open(ohos_config_path, 'r') as json_file: 670 json_info = json.load(json_file) 671 out_path = json_info.get("out_path") 672 strip_num = len(out_path.strip("/").split("/")) 673 if "fuzztest" == self.config.testtype[0]: 674 self._push_corpus_cov_if_exist(suite_file) 675 command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \ 676 rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX={DEFAULT_TEST_PATH}; \ 677 GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}" 678 else: 679 command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=%s " \ 680 "GCOV_PREFIX_STRIP=%s ./%s %s" % \ 681 (self.config.target_test_path, 682 filename, 683 DEFAULT_TEST_PATH, 684 str(strip_num), 685 filename, 686 test_para) 687 688 if self.config.hidelog: 689 command += " > {}.log 2>&1".format(filename) 690 691 return command 692 693 def _run_gtest(self, suite_file): 694 from xdevice import Variables 695 is_coverage_test = True if self.config.coverage else False 696 697 # push testsuite file 698 self.config.device.push_file(suite_file, self.config.target_test_path) 699 self.config.device.execute_shell_command( 700 "hilog -d %s" % (os.path.join(self.config.target_test_path, 701 os.path.basename(suite_file))) 702 ) 703 self._push_corpus_if_exist(suite_file) 704 705 # push resource files 706 resource_manager = ResourceManager() 707 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 708 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 709 self.config.device) 710 711 command = self._gtest_command(suite_file) 712 713 result = ResultManager(suite_file, self.config) 714 result.set_is_coverage(is_coverage_test) 715 716 try: 717 # get result 718 if self.config.hidelog: 719 return_message = "" 720 display_receiver = CollectingOutputReceiver() 721 self.config.device.execute_shell_command( 722 command, 723 receiver=display_receiver, 724 timeout=TIME_OUT, 725 retry=0) 726 else: 727 display_receiver = DisplayOutputReceiver() 728 self.config.device.execute_shell_command( 729 command, 730 receiver=display_receiver, 731 timeout=TIME_OUT, 732 retry=0) 733 return_message = display_receiver.output 734 except (ExecuteTerminate, DeviceError) as exception: 735 return_message = str(exception.args) 736 737 if self.config.hidelog: 738 self.result = result.get_test_results_hidelog(return_message) 739 else: 740 self.result = result.get_test_results(return_message) 741 742 resource_manager.process_cleaner_data(resource_data_dic, 743 resource_dir, 744 self.config.device) 745 746 def _push_corpus_cov_if_exist(self, suite_file): 747 corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep) 748 cov_file = os.path.join( 749 sys.framework_root_dir, "reports", "latest_corpus", corpus_path + "_corpus.tar.gz") 750 LOG.info("corpus_cov file :%s" % str(cov_file)) 751 self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path)) 752 753 def _push_corpus_if_exist(self, suite_file): 754 if "fuzztest" == self.config.testtype[0]: 755 corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus") 756 if not os.path.isdir(corpus_path): 757 return 758 759 corpus_dirs = [] 760 corpus_file_list = [] 761 762 for root, _, files in os.walk(corpus_path): 763 if not files: 764 continue 765 766 corpus_dir = root.split("corpus")[-1] 767 if corpus_dir != "": 768 corpus_dirs.append(corpus_dir) 769 770 for file in files: 771 cp_file = os.path.normcase(os.path.join(root, file)) 772 corpus_file_list.append(cp_file) 773 if file == "init": 774 self._alter_init(cp_file) 775 776 # mkdir corpus files dir 777 if corpus_dirs: 778 for corpus in corpus_dirs: 779 mkdir_corpus_command = f"shell; mkdir -p {corpus}" 780 self.config.device.connector_command(mkdir_corpus_command) 781 782 # push corpus file 783 if corpus_file_list: 784 for corpus_file in corpus_file_list: 785 self.config.device.push_file(corpus_file, 786 os.path.join(self.config.target_test_path, "corpus")) 787 788 def _get_test_para(self, 789 testcase, 790 testlevel, 791 testtype, 792 target_test_path, 793 suite_file, 794 filename): 795 if "benchmark" == testtype[0]: 796 test_para = (" --benchmark_out_format=json" 797 " --benchmark_out=%s%s.json") % ( 798 target_test_path, filename) 799 return test_para 800 801 if "" != testcase and "" == testlevel: 802 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 803 elif "" == testcase and "" != testlevel: 804 level_para = get_level_para_string(testlevel) 805 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 806 else: 807 test_para = "" 808 809 if "fuzztest" == testtype[0]: 810 cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path( 811 suite_file), "project.xml")).get_fuzzer_config("fuzztest") 812 LOG.info("config list :%s" % str(cfg_list)) 813 if self.config.coverage: 814 test_para += "corpus -runs=0" + \ 815 " -max_len=" + cfg_list[0] + \ 816 " -max_total_time=" + cfg_list[1] + \ 817 " -rss_limit_mb=" + cfg_list[2] 818 else: 819 test_para += "corpus -max_len=" + cfg_list[0] + \ 820 " -max_total_time=" + cfg_list[1] + \ 821 " -rss_limit_mb=" + cfg_list[2] 822 823 return test_para 824 825 826@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test) 827class JSUnitTestDriver(IDriver): 828 """ 829 JSUnitTestDriver is a Test that runs a native test package on given device. 830 """ 831 832 def __init__(self): 833 self.config = None 834 self.result = "" 835 self.start_time = None 836 self.ability_name = "" 837 self.package_name = "" 838 # log 839 self.hilog = None 840 self.hilog_proc = None 841 842 def __check_environment__(self, device_options): 843 pass 844 845 def __check_config__(self, config): 846 pass 847 848 def __result__(self): 849 return self.result if os.path.exists(self.result) else "" 850 851 def __execute__(self, request): 852 try: 853 LOG.info("developer_test driver") 854 self.config = request.config 855 self.config.target_test_path = DEFAULT_TEST_PATH 856 self.config.device = request.config.environment.devices[0] 857 858 suite_file = request.root.source.source_file 859 result_save_path = get_result_savepath(suite_file, self.config.report_path) 860 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 861 if not suite_file: 862 LOG.error("test source '%s' not exists" % 863 request.root.source.source_string) 864 return 865 866 if not self.config.device: 867 result = ResultManager(suite_file, self.config) 868 result.set_is_coverage(False) 869 result.make_empty_result_file( 870 "No test device is found") 871 return 872 873 package_name, ability_name = self._get_package_and_ability_name( 874 suite_file) 875 self.package_name = package_name 876 self.ability_name = ability_name 877 self.config.test_hap_out_path = \ 878 "/data/data/%s/files/" % self.package_name 879 self.config.device.connector_command("shell hilog -r") 880 881 self.hilog = get_device_log_file( 882 request.config.report_path, 883 request.config.device.__get_serial__() + "_" + request. 884 get_module_name(), 885 "device_hilog") 886 887 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 888 0o755) 889 890 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 891 self.config.device.device_log_collector.add_log_address(None, self.hilog) 892 _, self.hilog_proc = self.config.device.device_log_collector.\ 893 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 894 self._init_jsunit_test() 895 self._run_jsunit(suite_file, self.hilog) 896 hilog_file_pipe.flush() 897 self.generate_console_output(self.hilog, request) 898 xml_path = os.path.join( 899 request.config.report_path, "result", 900 '.'.join((request.get_module_name(), "xml"))) 901 shutil.move(xml_path, self.result) 902 finally: 903 self.config.device.device_log_collector.remove_log_address(None, self.hilog) 904 self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) 905 update_xml(request.root.source.source_file, self.result) 906 907 @staticmethod 908 def _get_acts_test_para(testcase, 909 testlevel, 910 testtype, 911 target_test_path, 912 suite_file, 913 filename): 914 if "actstest" == testtype[0]: 915 test_para = (" --actstest_out_format=json" 916 " --actstest_out=%s%s.json") % ( 917 target_test_path, filename) 918 return test_para 919 920 if "" != testcase and "" == testlevel: 921 test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase) 922 elif "" == testcase and "" != testlevel: 923 level_para = get_level_para_string(testlevel) 924 test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para) 925 else: 926 test_para = "" 927 return test_para 928 929 @staticmethod 930 def _get_hats_test_para(testcase, 931 testlevel, 932 testtype, 933 target_test_path, 934 suite_file, 935 filename): 936 if "hatstest" == testtype[0]: 937 test_hats_para = (" --hatstest_out_format=json" 938 " --hatstest_out=%s%s.json") % ( 939 target_test_path, filename) 940 return test_hats_para 941 942 if "" != testcase and "" == testlevel: 943 test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 944 elif "" == testcase and "" != testlevel: 945 level_para = get_level_para_string(testlevel) 946 test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 947 else: 948 test_hats_para = "" 949 return test_hats_para 950 951 @classmethod 952 def _get_json_shell_timeout(cls, json_filepath): 953 test_timeout = 0 954 try: 955 with open(json_filepath, 'r') as json_file: 956 data_dic = json.load(json_file) 957 if not data_dic: 958 return test_timeout 959 else: 960 if "driver" in data_dic.keys(): 961 driver_dict = data_dic.get("driver") 962 if driver_dict and "test-timeout" in driver_dict.keys(): 963 test_timeout = int(driver_dict["shell-timeout"]) / 1000 964 return test_timeout 965 except JSONDecodeError: 966 return test_timeout 967 finally: 968 print(" get json shell timeout finally") 969 970 @staticmethod 971 def _get_package_and_ability_name(hap_filepath): 972 package_name = "" 973 ability_name = "" 974 if os.path.exists(hap_filepath): 975 filename = os.path.basename(hap_filepath) 976 977 # unzip the hap file 978 hap_bak_path = os.path.abspath(os.path.join( 979 os.path.dirname(hap_filepath), 980 "%s.bak" % filename)) 981 zf_desc = zipfile.ZipFile(hap_filepath) 982 try: 983 zf_desc.extractall(path=hap_bak_path) 984 except RuntimeError as error: 985 print("Unzip error: ", hap_bak_path) 986 zf_desc.close() 987 988 # verify config.json file 989 app_profile_path = os.path.join(hap_bak_path, "config.json") 990 if not os.path.exists(app_profile_path): 991 print("file %s not exist" % app_profile_path) 992 return package_name, ability_name 993 994 if os.path.isdir(app_profile_path): 995 print("%s is a folder, and not a file" % app_profile_path) 996 return package_name, ability_name 997 998 # get package_name and ability_name value 999 load_dict = {} 1000 with open(app_profile_path, 'r') as load_f: 1001 load_dict = json.load(load_f) 1002 profile_list = load_dict.values() 1003 for profile in profile_list: 1004 package_name = profile.get("package") 1005 if not package_name: 1006 continue 1007 abilities = profile.get("abilities") 1008 for abilitie in abilities: 1009 abilities_name = abilitie.get("name") 1010 if abilities_name.startswith("."): 1011 ability_name = package_name + abilities_name[ 1012 abilities_name.find("."):] 1013 else: 1014 ability_name = abilities_name 1015 break 1016 break 1017 1018 # delete hap_bak_path 1019 if os.path.exists(hap_bak_path): 1020 shutil.rmtree(hap_bak_path) 1021 else: 1022 print("file %s not exist" % hap_filepath) 1023 return package_name, ability_name 1024 1025 def generate_console_output(self, device_log_file, request): 1026 result_message = self.read_device_log(device_log_file) 1027 1028 report_name = request.get_module_name() 1029 parsers = get_plugin( 1030 Plugin.PARSER, CommonParserType.jsunit) 1031 if parsers: 1032 parsers = parsers[:1] 1033 for listener in request.listeners: 1034 listener.device_sn = self.config.device.device_sn 1035 parser_instances = [] 1036 1037 for parser in parsers: 1038 parser_instance = parser.__class__() 1039 parser_instance.suites_name = report_name 1040 parser_instance.suite_name = report_name 1041 parser_instance.listeners = request.listeners 1042 parser_instances.append(parser_instance) 1043 handler = ShellHandler(parser_instances) 1044 process_command_ret(result_message, handler) 1045 1046 def read_device_log(self, device_log_file): 1047 result_message = "" 1048 with open(device_log_file, "r", encoding='utf-8', 1049 errors='ignore') as file_read_pipe: 1050 while True: 1051 data = file_read_pipe.readline() 1052 if not data: 1053 break 1054 # only filter JSApp log 1055 if data.lower().find(_ACE_LOG_MARKER) != -1: 1056 result_message += data 1057 if data.find("[end] run suites end") != -1: 1058 break 1059 return result_message 1060 1061 def start_hap_execute(self): 1062 try: 1063 command = "aa start -d 123 -a %s.MainAbility -b %s" \ 1064 % (self.package_name, self.package_name) 1065 self.start_time = time.time() 1066 result_value = self.config.device.execute_shell_command( 1067 command, timeout=TIME_OUT) 1068 1069 if "success" in str(result_value).lower(): 1070 LOG.info("execute %s's testcase success. result value=%s" 1071 % (self.package_name, result_value)) 1072 else: 1073 LOG.info("execute %s's testcase failed. result value=%s" 1074 % (self.package_name, result_value)) 1075 1076 _sleep_according_to_result(result_value) 1077 return_message = result_value 1078 except (ExecuteTerminate, DeviceError) as exception: 1079 return_message = exception.args 1080 1081 return return_message 1082 1083 def _init_jsunit_test(self): 1084 self.config.device.connector_command("target mount") 1085 self.config.device.execute_shell_command( 1086 "rm -rf %s" % self.config.target_test_path) 1087 self.config.device.execute_shell_command( 1088 "mkdir -p %s" % self.config.target_test_path) 1089 self.config.device.execute_shell_command( 1090 "mount -o rw,remount,rw /") 1091 1092 def _run_jsunit(self, suite_file, device_log_file): 1093 filename = os.path.basename(suite_file) 1094 _, suffix_name = os.path.splitext(filename) 1095 1096 resource_manager = ResourceManager() 1097 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 1098 if suffix_name == ".hap": 1099 json_file_path = suite_file.replace(".hap", ".json") 1100 if os.path.exists(json_file_path): 1101 timeout = self._get_json_shell_timeout(json_file_path) 1102 else: 1103 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 1104 else: 1105 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 1106 resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device) 1107 main_result = self._install_hap(suite_file) 1108 result = ResultManager(suite_file, self.config) 1109 if main_result: 1110 self._execute_hapfile_jsunittest() 1111 try: 1112 status = False 1113 actiontime = JS_TIMEOUT 1114 times = CYCLE_TIMES 1115 if timeout: 1116 actiontime = timeout 1117 times = 1 1118 with open(device_log_file, "r", encoding='utf-8', 1119 errors='ignore') as file_read_pipe: 1120 for i in range(0, times): 1121 if status: 1122 break 1123 else: 1124 time.sleep(float(actiontime)) 1125 start_time = int(time.time()) 1126 while True: 1127 data = file_read_pipe.readline() 1128 if data.lower().find(_ACE_LOG_MARKER) != -1 and data.find("[end] run suites end") != -1: 1129 LOG.info("execute testcase successfully.") 1130 status = True 1131 break 1132 if int(time.time()) - start_time > 5: 1133 break 1134 finally: 1135 _lock_screen(self.config.device) 1136 self._uninstall_hap(self.package_name) 1137 else: 1138 self.result = result.get_test_results("Error: install hap failed") 1139 LOG.error("Error: install hap failed") 1140 1141 resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device) 1142 1143 def _execute_hapfile_jsunittest(self): 1144 _unlock_screen(self.config.device) 1145 _unlock_device(self.config.device) 1146 1147 try: 1148 return_message = self.start_hap_execute() 1149 except (ExecuteTerminate, DeviceError) as exception: 1150 return_message = str(exception.args) 1151 1152 return return_message 1153 1154 def _install_hap(self, suite_file): 1155 message = self.config.device.connector_command("install %s" % suite_file) 1156 message = str(message).rstrip() 1157 if message == "" or "success" in message: 1158 return_code = True 1159 if message != "": 1160 LOG.info(message) 1161 else: 1162 return_code = False 1163 if message != "": 1164 LOG.warning(message) 1165 1166 _sleep_according_to_result(return_code) 1167 return return_code 1168 1169 def _uninstall_hap(self, package_name): 1170 return_message = self.config.device.execute_shell_command( 1171 "bm uninstall -n %s" % package_name) 1172 _sleep_according_to_result(return_message) 1173 return return_message 1174 1175 1176@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) 1177class OHRustTestDriver(IDriver): 1178 def __init__(self): 1179 self.result = "" 1180 self.error_message = "" 1181 self.config = None 1182 1183 def __check_environment__(self, device_options): 1184 pass 1185 1186 def __check_config__(self, config): 1187 pass 1188 1189 def __execute__(self, request): 1190 try: 1191 LOG.debug("Start to execute open harmony rust test") 1192 self.config = request.config 1193 self.config.device = request.config.environment.devices[0] 1194 self.config.target_test_path = DEFAULT_TEST_PATH 1195 suite_file = request.root.source.source_file 1196 LOG.debug("Testsuite filepath:{}".format(suite_file)) 1197 1198 if not suite_file: 1199 LOG.error("test source '{}' not exists".format( 1200 request.root.source.source_string)) 1201 return 1202 1203 result_save_path = get_result_savepath(suite_file, self.config.report_path) 1204 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 1205 self.config.device.set_device_report_path(request.config.report_path) 1206 self.config.device.device_log_collector.start_hilog_task() 1207 self._init_oh_rust() 1208 self._run_oh_rust(suite_file, request) 1209 except Exception as exception: 1210 self.error_message = exception 1211 if not getattr(exception, "error_no", ""): 1212 setattr(exception, "error_no", "03409") 1213 LOG.exception(self.error_message, exc_info=False, error_no="03409") 1214 finally: 1215 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 1216 time.time_ns()) 1217 log_tar_file_name = "{}_{}".format( 1218 request.get_module_name(), str(serial).replace(":", "_")) 1219 try: 1220 self.config.device.device_log_collector.stop_hilog_task( 1221 log_tar_file_name, module_name=request.get_module_name()) 1222 finally: 1223 xml_path = os.path.join( 1224 request.config.report_path, "result", 1225 '.'.join((request.get_module_name(), "xml"))) 1226 shutil.move(xml_path, self.result) 1227 update_xml(request.root.source.source_file, self.result) 1228 self.result = check_result_report( 1229 request.config.report_path, self.result, self.error_message) 1230 1231 def __result__(self): 1232 return self.result if os.path.exists(self.result) else "" 1233 1234 def _init_oh_rust(self): 1235 self.config.device.connector_command("target mount") 1236 self.config.device.execute_shell_command( 1237 "rm -rf %s" % self.config.target_test_path) 1238 self.config.device.execute_shell_command( 1239 "mkdir -p %s" % self.config.target_test_path) 1240 self.config.device.execute_shell_command( 1241 "mount -o rw,remount,rw /") 1242 1243 def _run_oh_rust(self, suite_file, request=None): 1244 self.config.device.push_file(suite_file, self.config.target_test_path) 1245 self.config.device.execute_shell_command( 1246 "hilog -d %s" % (os.path.join(self.config.target_test_path, 1247 os.path.basename(suite_file))) 1248 ) 1249 resource_manager = ResourceManager() 1250 resource_data_dict, resource_dir = \ 1251 resource_manager.get_resource_data_dic(suite_file) 1252 resource_manager.process_preparer_data(resource_data_dict, 1253 resource_dir, 1254 self.config.device) 1255 for listener in request.listeners: 1256 listener.device_sn = self.config.device.device_sn 1257 1258 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust) 1259 if parsers: 1260 parsers = parsers[:1] 1261 parser_instances = [] 1262 for parser in parsers: 1263 parser_instance = parser.__class__() 1264 parser_instance.suite_name = request.get_module_name() 1265 parser_instance.listeners = request.listeners 1266 parser_instances.append(parser_instance) 1267 handler = ShellHandler(parser_instances) 1268 if self.config.coverage: 1269 command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format( 1270 self.config.target_test_path, os.path.basename(suite_file)) 1271 else: 1272 command = "cd {}; chmod +x *; ./{}".format( 1273 self.config.target_test_path, os.path.basename(suite_file)) 1274 self.config.device.execute_shell_command( 1275 command, timeout=TIME_OUT, receiver=handler, retry=0) 1276 if self.config.coverage: 1277 result = ResultManager(suite_file, self.config) 1278 result.obtain_coverage_data() 1279 resource_manager.process_cleaner_data(resource_data_dict, resource_dir, 1280 self.config.device) 1281