1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import re 22import shutil 23import subprocess 24import sys 25import time 26import platform 27import zipfile 28import stat 29import random 30import xml.etree.ElementTree as ET 31from dataclasses import dataclass 32from json import JSONDecodeError 33 34from xdevice import DeviceTestType, check_result_report 35from xdevice import DeviceLabelType 36from xdevice import CommonParserType 37from xdevice import ExecuteTerminate 38from xdevice import DeviceError 39from xdevice import ShellHandler 40 41from xdevice import IDriver 42from xdevice import platform_logger 43from xdevice import Plugin 44from xdevice import get_plugin 45from ohos.environment.dmlib import process_command_ret 46from core.utils import get_decode 47from core.utils import get_fuzzer_path 48from core.config.resource_manager import ResourceManager 49from core.config.config_manager import FuzzerConfigManager 50 51__all__ = [ 52 "CppTestDriver", 53 "JSUnitTestDriver", 54 "disable_keyguard", 55 "GTestConst"] 56 57LOG = platform_logger("Drivers") 58DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test") 59OBJ = "obj" 60_ACE_LOG_MARKER = " a0c0d0" 61TIME_OUT = 900 * 1000 62JS_TIMEOUT = 10 63CYCLE_TIMES = 50 64 65FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL 66MODES = stat.S_IWUSR | stat.S_IRUSR 67 68 69class CollectingOutputReceiver: 70 def __init__(self): 71 self.output = "" 72 73 def __read__(self, output): 74 self.output = "%s%s" % (self.output, output) 75 76 def __error__(self, message): 77 pass 78 79 def __done__(self, result_code="", message=""): 80 pass 81 82 83class DisplayOutputReceiver: 84 def __init__(self): 85 self.output = "" 86 self.unfinished_line = "" 87 88 def __read__(self, output): 89 self.output = "%s%s" % (self.output, output) 90 lines = self._process_output(output) 91 for line in lines: 92 line = line.strip() 93 if line: 94 LOG.info(get_decode(line)) 95 96 def __error__(self, message): 97 pass 98 99 def __done__(self, result_code="", message=""): 100 pass 101 102 def _process_output(self, output, end_mark="\n"): 103 content = output 104 if self.unfinished_line: 105 content = "".join((self.unfinished_line, content)) 106 self.unfinished_line = "" 107 lines = content.split(end_mark) 108 if content.endswith(end_mark): 109 return lines[:-1] 110 else: 111 self.unfinished_line = lines[-1] 112 return lines[:-1] 113 114 115@dataclass 116class GTestConst(object): 117 exec_para_filter = "--gtest_filter" 118 exec_para_level = "--gtest_testsize" 119 exec_acts_para_filter = "--jstest_filter" 120 exec_acts_para_level = "--jstest_testsize" 121 122 123def get_device_log_file(report_path, serial=None, log_name="device_log"): 124 from xdevice import Variables 125 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 126 os.makedirs(log_path, exist_ok=True) 127 128 serial = serial or time.time_ns() 129 device_file_name = "{}_{}.log".format(log_name, serial) 130 device_log_file = os.path.join(log_path, device_file_name) 131 return device_log_file 132 133 134def get_level_para_string(level_string): 135 level_list = list(set(level_string.split(","))) 136 level_para_string = "" 137 for item in level_list: 138 if not item.isdigit(): 139 continue 140 item = item.strip(" ") 141 level_para_string = f"{level_para_string}Level{item}," 142 level_para_string = level_para_string.strip(",") 143 return level_para_string 144 145 146def get_result_savepath(testsuit_path, result_rootpath): 147 findkey = os.sep + "tests" + os.sep 148 filedir, _ = os.path.split(testsuit_path) 149 pos = filedir.find(findkey) 150 if -1 != pos: 151 subpath = filedir[pos + len(findkey):] 152 pos1 = subpath.find(os.sep) 153 if -1 != pos1: 154 subpath = subpath[pos1 + len(os.sep):] 155 result_path = os.path.join(result_rootpath, "result", subpath) 156 else: 157 result_path = os.path.join(result_rootpath, "result") 158 else: 159 result_path = os.path.join(result_rootpath, "result") 160 161 if not os.path.exists(result_path): 162 os.makedirs(result_path) 163 164 LOG.info("result_savepath = " + result_path) 165 return result_path 166 167 168def get_test_log_savepath(result_rootpath, result_suit_path): 169 suit_path = result_suit_path.split("result")[-1].strip(os.sep).split(os.sep)[0] 170 test_log_path = os.path.join(result_rootpath, "log", "test_log", suit_path) 171 172 if not os.path.exists(test_log_path): 173 os.makedirs(test_log_path) 174 175 LOG.info("test_log_savepath = {}".format(test_log_path)) 176 return test_log_path 177 178 179def update_xml(suite_file, result_xml): 180 suite_path_txt = suite_file.split(".")[0] + "_path.txt" 181 if os.path.exists(suite_path_txt) and os.path.exists(result_xml): 182 with open(suite_path_txt, "r") as path_text: 183 line = path_text.readline().replace("//", "").strip() 184 tree = ET.parse(result_xml) 185 tree.getroot().attrib["path"] = line 186 tree.write(result_xml) 187 188 189# all testsuit common Unavailable test result xml 190def _create_empty_result_file(filepath, filename, error_message): 191 error_message = str(error_message) 192 error_message = error_message.replace("\"", "") 193 error_message = error_message.replace("<", "") 194 error_message = error_message.replace(">", "") 195 error_message = error_message.replace("&", "") 196 if filename.endswith(".hap"): 197 filename = filename.split(".")[0] 198 if not os.path.exists(filepath): 199 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 200 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 201 time.localtime()) 202 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 203 file_desc.write( 204 '<testsuites tests="0" failures="0" ' 205 'disabled="0" errors="0" timestamp="%s" ' 206 'time="0" name="AllTests">\n' % time_stamp) 207 file_desc.write( 208 ' <testsuite name="%s" tests="0" failures="0" ' 209 'disabled="0" errors="0" time="0.0" ' 210 'unavailable="1" message="%s">\n' % 211 (filename, error_message)) 212 file_desc.write(' </testsuite>\n') 213 file_desc.write('</testsuites>\n') 214 return 215 216 217def _unlock_screen(device): 218 device.execute_shell_command("svc power stayon true") 219 time.sleep(1) 220 221 222def _unlock_device(device): 223 device.execute_shell_command("input keyevent 82") 224 time.sleep(1) 225 device.execute_shell_command("wm dismiss-keyguard") 226 time.sleep(1) 227 228 229def _lock_screen(device): 230 device.execute_shell_command("svc power stayon false") 231 time.sleep(1) 232 233 234def disable_keyguard(device): 235 _unlock_screen(device) 236 _unlock_device(device) 237 238 239def _sleep_according_to_result(result): 240 if result: 241 time.sleep(1) 242 243 244def _create_fuzz_crash_file(filepath, filename): 245 if not os.path.exists(filepath): 246 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 247 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 248 time.localtime()) 249 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 250 file_desc.write( 251 '<testsuites disabled="0" name="AllTests" ' 252 'time="300" timestamp="%s" errors="0" ' 253 'failures="1" tests="1">\n' % time_stamp) 254 file_desc.write( 255 ' <testsuite disabled="0" name="%s" time="300" ' 256 'errors="0" failures="1" tests="1">\n' % filename) 257 file_desc.write( 258 ' <testcase name="%s" time="300" classname="%s" ' 259 'status="run">\n' % (filename, filename)) 260 file_desc.write( 261 ' <failure type="" ' 262 'message="Fuzzer crash. See ERROR in log file">\n') 263 file_desc.write(' </failure>\n') 264 file_desc.write(' </testcase>\n') 265 file_desc.write(' </testsuite>\n') 266 file_desc.write('</testsuites>\n') 267 return 268 269 270def _create_fuzz_pass_file(filepath, filename): 271 if not os.path.exists(filepath): 272 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 273 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 274 time.localtime()) 275 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 276 file_desc.write( 277 '<testsuites disabled="0" name="AllTests" ' 278 'time="300" timestamp="%s" errors="0" ' 279 'failures="0" tests="1">\n' % time_stamp) 280 file_desc.write( 281 ' <testsuite disabled="0" name="%s" time="300" ' 282 'errors="0" failures="0" tests="1">\n' % filename) 283 file_desc.write( 284 ' <testcase name="%s" time="300" classname="%s" ' 285 'status="run"/>\n' % (filename, filename)) 286 file_desc.write(' </testsuite>\n') 287 file_desc.write('</testsuites>\n') 288 return 289 290 291def _create_fuzz_result_file(filepath, filename, error_message): 292 error_message = str(error_message) 293 error_message = error_message.replace("\"", "") 294 error_message = error_message.replace("<", "") 295 error_message = error_message.replace(">", "") 296 error_message = error_message.replace("&", "") 297 if "AddressSanitizer" in error_message: 298 LOG.error("FUZZ TEST CRASH") 299 _create_fuzz_crash_file(filepath, filename) 300 elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second', 301 error_message, re.M) is not None: 302 LOG.info("FUZZ TEST PASS") 303 _create_fuzz_pass_file(filepath, filename) 304 else: 305 LOG.error("FUZZ TEST UNAVAILABLE") 306 _create_empty_result_file(filepath, filename, error_message) 307 return 308 309 310class ResultManager(object): 311 def __init__(self, testsuit_path, config): 312 self.testsuite_path = testsuit_path 313 self.config = config 314 self.result_rootpath = self.config.report_path 315 self.device = self.config.device 316 if testsuit_path.endswith(".hap"): 317 self.device_testpath = self.config.test_hap_out_path 318 else: 319 self.device_testpath = self.config.target_test_path 320 self.testsuite_name = os.path.basename(self.testsuite_path) 321 self.is_coverage = False 322 323 def set_is_coverage(self, is_coverage): 324 self.is_coverage = is_coverage 325 326 def get_test_results(self, error_message=""): 327 # Get test result files 328 filepath, _ = self.obtain_test_result_file() 329 if "fuzztest" == self.config.testtype[0]: 330 LOG.info("create fuzz test report") 331 _create_fuzz_result_file(filepath, self.testsuite_name, 332 error_message) 333 if not self.is_coverage: 334 self._obtain_fuzz_corpus() 335 336 if not os.path.exists(filepath): 337 _create_empty_result_file(filepath, self.testsuite_name, 338 error_message) 339 if "benchmark" == self.config.testtype[0]: 340 self._obtain_benchmark_result() 341 # Get coverage data files 342 if self.is_coverage: 343 self.obtain_coverage_data() 344 345 return filepath 346 347 def get_test_results_hidelog(self, error_message=""): 348 # Get test result files 349 result_file_path, test_log_path = self.obtain_test_result_file() 350 log_content = "" 351 if not error_message: 352 if os.path.exists(test_log_path): 353 with open(test_log_path, "r") as log: 354 log_content = log.readlines() 355 else: 356 LOG.error("{}: Test log not exist.".format(test_log_path)) 357 else: 358 log_content = error_message 359 360 if "fuzztest" == self.config.testtype[0]: 361 LOG.info("create fuzz test report") 362 _create_fuzz_result_file(result_file_path, self.testsuite_name, 363 log_content) 364 if not self.is_coverage: 365 self._obtain_fuzz_corpus() 366 367 if not os.path.exists(result_file_path): 368 _create_empty_result_file(result_file_path, self.testsuite_name, 369 log_content) 370 if "benchmark" == self.config.testtype[0]: 371 self._obtain_benchmark_result() 372 # Get coverage data files 373 if self.is_coverage: 374 self.obtain_coverage_data() 375 376 return result_file_path 377 378 def get_result_sub_save_path(self): 379 find_key = os.sep + "benchmark" + os.sep 380 file_dir, _ = os.path.split(self.testsuite_path) 381 pos = file_dir.find(find_key) 382 subpath = "" 383 if -1 != pos: 384 subpath = file_dir[pos + len(find_key):] 385 LOG.info("subpath = " + subpath) 386 return subpath 387 388 def obtain_test_result_file(self): 389 result_save_path = get_result_savepath(self.testsuite_path, 390 self.result_rootpath) 391 392 result_file_path = os.path.join(result_save_path, 393 "%s.xml" % self.testsuite_name) 394 395 result_josn_file_path = os.path.join(result_save_path, 396 "%s.json" % self.testsuite_name) 397 398 if self.testsuite_path.endswith('.hap'): 399 remote_result_file = os.path.join(self.device_testpath, 400 "testcase_result.xml") 401 remote_json_result_file = os.path.join(self.device_testpath, 402 "%s.json" % self.testsuite_name) 403 else: 404 remote_result_file = os.path.join(self.device_testpath, 405 "%s.xml" % self.testsuite_name) 406 remote_json_result_file = os.path.join(self.device_testpath, 407 "%s.json" % self.testsuite_name) 408 409 if self.config.testtype[0] != "fuzztest": 410 if self.device.is_file_exist(remote_result_file): 411 self.device.pull_file(remote_result_file, result_file_path) 412 elif self.device.is_file_exist(remote_json_result_file): 413 self.device.pull_file(remote_json_result_file, 414 result_josn_file_path) 415 result_file_path = result_josn_file_path 416 else: 417 LOG.info("%s not exist", remote_result_file) 418 419 if self.config.hidelog: 420 remote_log_result_file = os.path.join(self.device_testpath, 421 "%s.log" % self.testsuite_name) 422 test_log_save_path = get_test_log_savepath(self.result_rootpath, result_save_path) 423 test_log_file_path = os.path.join(test_log_save_path, 424 "%s.log" % self.testsuite_name) 425 self.device.pull_file(remote_log_result_file, test_log_file_path) 426 return result_file_path, test_log_file_path 427 428 return result_file_path, "" 429 430 def make_empty_result_file(self, error_message=""): 431 result_savepath = get_result_savepath(self.testsuite_path, 432 self.result_rootpath) 433 result_filepath = os.path.join(result_savepath, "%s.xml" % 434 self.testsuite_name) 435 if not os.path.exists(result_filepath): 436 _create_empty_result_file(result_filepath, 437 self.testsuite_name, error_message) 438 439 def is_exist_target_in_device(self, path, target): 440 if platform.system() == "Windows": 441 command = '\"ls -l %s | grep %s\"' % (path, target) 442 else: 443 command = "ls -l %s | grep %s" % (path, target) 444 445 check_result = False 446 stdout_info = self.device.execute_shell_command(command) 447 if stdout_info != "" and stdout_info.find(target) != -1: 448 check_result = True 449 return check_result 450 451 def obtain_coverage_data(self): 452 cov_root_dir = os.path.abspath(os.path.join( 453 self.result_rootpath, 454 "..", 455 "coverage", 456 "data", 457 "exec")) 458 459 460 tests_path = self.config.testcases_path 461 test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0] 462 cxx_cov_path = os.path.abspath(os.path.join( 463 self.result_rootpath, 464 "..", 465 "coverage", 466 "data", 467 "cxx", 468 self.testsuite_name + '_' + test_type)) 469 470 if os.path.basename(self.testsuite_name).startswith("rust_"): 471 target_name = "lib.unstripped" 472 else: 473 target_name = OBJ 474 if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name): 475 if not os.path.exists(cxx_cov_path): 476 os.makedirs(cxx_cov_path) 477 else: 478 cxx_cov_path = cxx_cov_path + f"_{str(int(time.time()))}" 479 self.config.device.execute_shell_command( 480 "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name)) 481 src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name) 482 self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT) 483 tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name) 484 if platform.system() == "Windows": 485 process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True) 486 process.communicate() 487 os.remove(tar_path) 488 os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ)) 489 else: 490 subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" % 491 (tar_path, cxx_cov_path), shell=True).communicate() 492 subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate() 493 if target_name != OBJ: 494 subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name), 495 os.path.join(cxx_cov_path, OBJ)), shell=True).communicate() 496 497 def _obtain_fuzz_corpus(self): 498 command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;" 499 self.config.device.execute_shell_command(command) 500 result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath) 501 LOG.info(f"fuzz_dir = {result_save_path}") 502 self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path) 503 504 def _obtain_benchmark_result(self): 505 benchmark_root_dir = os.path.abspath( 506 os.path.join(self.result_rootpath, "benchmark")) 507 benchmark_dir = os.path.abspath( 508 os.path.join(benchmark_root_dir, 509 self.get_result_sub_save_path(), 510 self.testsuite_name)) 511 512 if not os.path.exists(benchmark_dir): 513 os.makedirs(benchmark_dir) 514 515 LOG.info("benchmark_dir = %s" % benchmark_dir) 516 self.device.pull_file(os.path.join(self.device_testpath, 517 "%s.json" % self.testsuite_name), benchmark_dir) 518 if not os.path.exists(os.path.join(benchmark_dir, 519 "%s.json" % self.testsuite_name)): 520 os.rmdir(benchmark_dir) 521 return benchmark_dir 522 523 524@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test) 525class CppTestDriver(IDriver): 526 """ 527 CppTest is a Test that runs a native test package on given device. 528 """ 529 # test driver config 530 config = None 531 result = "" 532 533 def __check_environment__(self, device_options): 534 if len(device_options) == 1 and device_options[0].label is None: 535 return True 536 if len(device_options) != 1 or \ 537 device_options[0].label != DeviceLabelType.phone: 538 return False 539 return True 540 541 def __check_config__(self, config): 542 pass 543 544 def __result__(self): 545 return self.result if os.path.exists(self.result) else "" 546 547 def __execute__(self, request): 548 try: 549 self.config = request.config 550 self.config.target_test_path = DEFAULT_TEST_PATH 551 self.config.device = request.config.environment.devices[0] 552 553 suite_file = request.root.source.source_file 554 LOG.debug("Testsuite FilePath: %s" % suite_file) 555 556 if not suite_file: 557 LOG.error("test source '%s' not exists" % 558 request.root.source.source_string) 559 return 560 561 if not self.config.device: 562 result = ResultManager(suite_file, self.config) 563 result.set_is_coverage(False) 564 result.make_empty_result_file( 565 "No test device is found. ") 566 return 567 self.config.device.set_device_report_path(request.config.report_path) 568 self.config.device.device_log_collector.start_hilog_task() 569 self._init_gtest() 570 self._run_gtest(suite_file) 571 572 finally: 573 log_path = get_result_savepath(request.root.source.source_file, request.config.report_path) 574 suit_name = os.path.basename(request.root.source.source_file) 575 xml_path = os.path.join(log_path, f"{suit_name}.xml") 576 if not os.path.exists(xml_path): 577 _create_empty_result_file(xml_path, suit_name, "ERROR") 578 update_xml(request.root.source.source_file, xml_path) 579 serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns()) 580 log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace( 581 ":", "_")) 582 self.config.device.device_log_collector.stop_hilog_task( 583 log_tar_file_name, module_name=request.get_module_name()) 584 585 @staticmethod 586 def _alter_init(name): 587 with open(name, "rb") as f: 588 lines = f.read() 589 str_content = lines.decode("utf-8") 590 591 pattern_sharp = '^\s*#.*$(\n|\r\n)' 592 pattern_star = '/\*.*?\*/(\n|\r\n)+' 593 pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+' 594 595 if re.match(pattern_sharp, str_content, flags=re.M): 596 striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M) 597 elif re.findall(pattern_star, str_content, flags=re.S): 598 striped_content = re.sub(pattern_star, '', str_content, flags=re.S) 599 elif re.findall(pattern_xml, str_content, flags=re.S): 600 striped_content = re.sub(pattern_xml, '', str_content, flags=re.S) 601 else: 602 striped_content = str_content 603 604 striped_bt = striped_content.encode("utf-8") 605 if os.path.exists(name): 606 os.remove(name) 607 with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f: 608 f.write(striped_bt) 609 610 def _init_gtest(self): 611 self.config.device.connector_command("target mount") 612 self.config.device.execute_shell_command( 613 "rm -rf %s" % self.config.target_test_path) 614 self.config.device.execute_shell_command( 615 "mkdir -p %s" % self.config.target_test_path) 616 self.config.device.execute_shell_command( 617 "mount -o rw,remount,rw /") 618 if "fuzztest" == self.config.testtype[0]: 619 self.config.device.execute_shell_command( 620 "mkdir -p %s" % os.path.join(self.config.target_test_path, 621 "corpus")) 622 623 def _gtest_command(self, suite_file): 624 filename = os.path.basename(suite_file) 625 test_para = self._get_test_para(self.config.testcase, 626 self.config.testlevel, 627 self.config.testtype, 628 self.config.target_test_path, 629 suite_file, 630 filename) 631 632 # execute testcase 633 if not self.config.coverage: 634 if self.config.random == "random": 635 seed = random.randint(1, 100) 636 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % ( 637 self.config.target_test_path, 638 filename, 639 filename, 640 test_para, 641 seed) 642 else: 643 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % ( 644 self.config.target_test_path, 645 filename, 646 filename, 647 test_para) 648 else: 649 coverage_outpath = self.config.coverage_outpath 650 if coverage_outpath: 651 strip_num = len(coverage_outpath.strip("/").split("/")) 652 else: 653 ohos_config_path = os.path.join(sys.source_code_root_path, "out", "ohos_config.json") 654 with open(ohos_config_path, 'r') as json_file: 655 json_info = json.load(json_file) 656 out_path = json_info.get("out_path") 657 strip_num = len(out_path.strip("/").split("/")) 658 if "fuzztest" == self.config.testtype[0]: 659 self._push_corpus_cov_if_exist(suite_file) 660 command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \ 661 rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX={DEFAULT_TEST_PATH}; \ 662 GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}" 663 else: 664 command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=%s " \ 665 "GCOV_PREFIX_STRIP=%s ./%s %s" % \ 666 (self.config.target_test_path, 667 filename, 668 DEFAULT_TEST_PATH, 669 str(strip_num), 670 filename, 671 test_para) 672 673 if self.config.hidelog: 674 command += " > {}.log 2>&1".format(filename) 675 676 return command 677 678 def _run_gtest(self, suite_file): 679 from xdevice import Variables 680 is_coverage_test = True if self.config.coverage else False 681 682 # push testsuite file 683 self.config.device.push_file(suite_file, self.config.target_test_path) 684 self.config.device.execute_shell_command( 685 "hilog -d %s" % (os.path.join(self.config.target_test_path, 686 os.path.basename(suite_file))) 687 ) 688 self._push_corpus_if_exist(suite_file) 689 690 # push resource files 691 resource_manager = ResourceManager() 692 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 693 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 694 self.config.device) 695 696 command = self._gtest_command(suite_file) 697 698 result = ResultManager(suite_file, self.config) 699 result.set_is_coverage(is_coverage_test) 700 701 try: 702 # get result 703 if self.config.hidelog: 704 return_message = "" 705 display_receiver = CollectingOutputReceiver() 706 self.config.device.execute_shell_command( 707 command, 708 receiver=display_receiver, 709 timeout=TIME_OUT, 710 retry=0) 711 else: 712 display_receiver = DisplayOutputReceiver() 713 self.config.device.execute_shell_command( 714 command, 715 receiver=display_receiver, 716 timeout=TIME_OUT, 717 retry=0) 718 return_message = display_receiver.output 719 except (ExecuteTerminate, DeviceError) as exception: 720 return_message = str(exception.args) 721 722 if self.config.hidelog: 723 self.result = result.get_test_results_hidelog(return_message) 724 else: 725 self.result = result.get_test_results(return_message) 726 update_xml(suite_file, self.result) 727 728 resource_manager.process_cleaner_data(resource_data_dic, 729 resource_dir, 730 self.config.device) 731 732 def _push_corpus_cov_if_exist(self, suite_file): 733 corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep) 734 cov_file = os.path.join( 735 sys.framework_root_dir, "reports", "latest_corpus", corpus_path + "_corpus.tar.gz") 736 LOG.info("corpus_cov file :%s" % str(cov_file)) 737 self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path)) 738 739 def _push_corpus_if_exist(self, suite_file): 740 if "fuzztest" == self.config.testtype[0]: 741 corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus") 742 if not os.path.isdir(corpus_path): 743 return 744 745 corpus_dirs = [] 746 corpus_file_list = [] 747 748 for root, _, files in os.walk(corpus_path): 749 if not files: 750 continue 751 752 corpus_dir = root.split("corpus")[-1] 753 if corpus_dir != "": 754 corpus_dirs.append(corpus_dir) 755 756 for file in files: 757 cp_file = os.path.normcase(os.path.join(root, file)) 758 corpus_file_list.append(cp_file) 759 if file == "init": 760 self._alter_init(cp_file) 761 762 # mkdir corpus files dir 763 if corpus_dirs: 764 for corpus in corpus_dirs: 765 mkdir_corpus_command = f"shell; mkdir -p {corpus}" 766 self.config.device.connector_command(mkdir_corpus_command) 767 768 # push corpus file 769 if corpus_file_list: 770 for corpus_file in corpus_file_list: 771 self.config.device.push_file(corpus_file, 772 os.path.join(self.config.target_test_path, "corpus")) 773 774 def _get_test_para(self, 775 testcase, 776 testlevel, 777 testtype, 778 target_test_path, 779 suite_file, 780 filename): 781 if "benchmark" == testtype[0]: 782 test_para = (" --benchmark_out_format=json" 783 " --benchmark_out=%s%s.json") % ( 784 target_test_path, filename) 785 return test_para 786 787 if "" != testcase and "" == testlevel: 788 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 789 elif "" == testcase and "" != testlevel: 790 level_para = get_level_para_string(testlevel) 791 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 792 else: 793 test_para = "" 794 795 if "fuzztest" == testtype[0]: 796 cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path( 797 suite_file), "project.xml")).get_fuzzer_config("fuzztest") 798 LOG.info("config list :%s" % str(cfg_list)) 799 if self.config.coverage: 800 test_para += "corpus -runs=0" + \ 801 " -max_len=" + cfg_list[0] + \ 802 " -max_total_time=" + cfg_list[1] + \ 803 " -rss_limit_mb=" + cfg_list[2] 804 else: 805 test_para += "corpus -max_len=" + cfg_list[0] + \ 806 " -max_total_time=" + cfg_list[1] + \ 807 " -rss_limit_mb=" + cfg_list[2] 808 809 return test_para 810 811 812@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test) 813class JSUnitTestDriver(IDriver): 814 """ 815 JSUnitTestDriver is a Test that runs a native test package on given device. 816 """ 817 818 def __init__(self): 819 self.config = None 820 self.result = "" 821 self.start_time = None 822 self.ability_name = "" 823 self.package_name = "" 824 # log 825 self.hilog = None 826 self.hilog_proc = None 827 828 def __check_environment__(self, device_options): 829 pass 830 831 def __check_config__(self, config): 832 pass 833 834 def __result__(self): 835 return self.result if os.path.exists(self.result) else "" 836 837 def __execute__(self, request): 838 try: 839 LOG.info("developer_test driver") 840 self.config = request.config 841 self.config.target_test_path = DEFAULT_TEST_PATH 842 self.config.device = request.config.environment.devices[0] 843 844 suite_file = request.root.source.source_file 845 result_save_path = get_result_savepath(suite_file, self.config.report_path) 846 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 847 if not suite_file: 848 LOG.error("test source '%s' not exists" % 849 request.root.source.source_string) 850 return 851 852 if not self.config.device: 853 result = ResultManager(suite_file, self.config) 854 result.set_is_coverage(False) 855 result.make_empty_result_file( 856 "No test device is found") 857 return 858 859 package_name, ability_name = self._get_package_and_ability_name( 860 suite_file) 861 self.package_name = package_name 862 self.ability_name = ability_name 863 self.config.test_hap_out_path = \ 864 "/data/data/%s/files/" % self.package_name 865 self.config.device.connector_command("shell hilog -r") 866 867 self.hilog = get_device_log_file( 868 request.config.report_path, 869 request.config.device.__get_serial__() + "_" + request. 870 get_module_name(), 871 "device_hilog") 872 873 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 874 0o755) 875 876 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 877 self.config.device.device_log_collector.add_log_address(None, self.hilog) 878 _, self.hilog_proc = self.config.device.device_log_collector.\ 879 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 880 self._init_jsunit_test() 881 self._run_jsunit(suite_file, self.hilog) 882 hilog_file_pipe.flush() 883 self.generate_console_output(self.hilog, request) 884 xml_path = os.path.join( 885 request.config.report_path, "result", 886 '.'.join((request.get_module_name(), "xml"))) 887 shutil.move(xml_path, self.result) 888 update_xml(suite_file, self.result) 889 finally: 890 self.config.device.device_log_collector.remove_log_address(None, self.hilog) 891 self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) 892 893 @staticmethod 894 def _get_acts_test_para(testcase, 895 testlevel, 896 testtype, 897 target_test_path, 898 suite_file, 899 filename): 900 if "actstest" == testtype[0]: 901 test_para = (" --actstest_out_format=json" 902 " --actstest_out=%s%s.json") % ( 903 target_test_path, filename) 904 return test_para 905 906 if "" != testcase and "" == testlevel: 907 test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase) 908 elif "" == testcase and "" != testlevel: 909 level_para = get_level_para_string(testlevel) 910 test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para) 911 else: 912 test_para = "" 913 return test_para 914 915 @staticmethod 916 def _get_hats_test_para(testcase, 917 testlevel, 918 testtype, 919 target_test_path, 920 suite_file, 921 filename): 922 if "hatstest" == testtype[0]: 923 test_hats_para = (" --hatstest_out_format=json" 924 " --hatstest_out=%s%s.json") % ( 925 target_test_path, filename) 926 return test_hats_para 927 928 if "" != testcase and "" == testlevel: 929 test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 930 elif "" == testcase and "" != testlevel: 931 level_para = get_level_para_string(testlevel) 932 test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 933 else: 934 test_hats_para = "" 935 return test_hats_para 936 937 @classmethod 938 def _get_json_shell_timeout(cls, json_filepath): 939 test_timeout = 0 940 try: 941 with open(json_filepath, 'r') as json_file: 942 data_dic = json.load(json_file) 943 if not data_dic: 944 return test_timeout 945 else: 946 if "driver" in data_dic.keys(): 947 driver_dict = data_dic.get("driver") 948 if driver_dict and "test-timeout" in driver_dict.keys(): 949 test_timeout = int(driver_dict["shell-timeout"]) / 1000 950 return test_timeout 951 except JSONDecodeError: 952 return test_timeout 953 finally: 954 print(" get json shell timeout finally") 955 956 @staticmethod 957 def _get_package_and_ability_name(hap_filepath): 958 package_name = "" 959 ability_name = "" 960 if os.path.exists(hap_filepath): 961 filename = os.path.basename(hap_filepath) 962 963 # unzip the hap file 964 hap_bak_path = os.path.abspath(os.path.join( 965 os.path.dirname(hap_filepath), 966 "%s.bak" % filename)) 967 zf_desc = zipfile.ZipFile(hap_filepath) 968 try: 969 zf_desc.extractall(path=hap_bak_path) 970 except RuntimeError as error: 971 print("Unzip error: ", hap_bak_path) 972 zf_desc.close() 973 974 # verify config.json file 975 app_profile_path = os.path.join(hap_bak_path, "config.json") 976 if not os.path.exists(app_profile_path): 977 print("file %s not exist" % app_profile_path) 978 return package_name, ability_name 979 980 if os.path.isdir(app_profile_path): 981 print("%s is a folder, and not a file" % app_profile_path) 982 return package_name, ability_name 983 984 # get package_name and ability_name value 985 load_dict = {} 986 with open(app_profile_path, 'r') as load_f: 987 load_dict = json.load(load_f) 988 profile_list = load_dict.values() 989 for profile in profile_list: 990 package_name = profile.get("package") 991 if not package_name: 992 continue 993 abilities = profile.get("abilities") 994 for abilitie in abilities: 995 abilities_name = abilitie.get("name") 996 if abilities_name.startswith("."): 997 ability_name = package_name + abilities_name[ 998 abilities_name.find("."):] 999 else: 1000 ability_name = abilities_name 1001 break 1002 break 1003 1004 # delete hap_bak_path 1005 if os.path.exists(hap_bak_path): 1006 shutil.rmtree(hap_bak_path) 1007 else: 1008 print("file %s not exist" % hap_filepath) 1009 return package_name, ability_name 1010 1011 def generate_console_output(self, device_log_file, request): 1012 result_message = self.read_device_log(device_log_file) 1013 1014 report_name = request.get_module_name() 1015 parsers = get_plugin( 1016 Plugin.PARSER, CommonParserType.jsunit) 1017 if parsers: 1018 parsers = parsers[:1] 1019 for listener in request.listeners: 1020 listener.device_sn = self.config.device.device_sn 1021 parser_instances = [] 1022 1023 for parser in parsers: 1024 parser_instance = parser.__class__() 1025 parser_instance.suites_name = report_name 1026 parser_instance.suite_name = report_name 1027 parser_instance.listeners = request.listeners 1028 parser_instances.append(parser_instance) 1029 handler = ShellHandler(parser_instances) 1030 process_command_ret(result_message, handler) 1031 1032 def read_device_log(self, device_log_file): 1033 result_message = "" 1034 with open(device_log_file, "r", encoding='utf-8', 1035 errors='ignore') as file_read_pipe: 1036 while True: 1037 data = file_read_pipe.readline() 1038 if not data: 1039 break 1040 # only filter JSApp log 1041 if data.lower().find(_ACE_LOG_MARKER) != -1: 1042 result_message += data 1043 if data.find("[end] run suites end") != -1: 1044 break 1045 return result_message 1046 1047 def start_hap_execute(self): 1048 try: 1049 command = "aa start -d 123 -a %s.MainAbility -b %s" \ 1050 % (self.package_name, self.package_name) 1051 self.start_time = time.time() 1052 result_value = self.config.device.execute_shell_command( 1053 command, timeout=TIME_OUT) 1054 1055 if "success" in str(result_value).lower(): 1056 LOG.info("execute %s's testcase success. result value=%s" 1057 % (self.package_name, result_value)) 1058 else: 1059 LOG.info("execute %s's testcase failed. result value=%s" 1060 % (self.package_name, result_value)) 1061 1062 _sleep_according_to_result(result_value) 1063 return_message = result_value 1064 except (ExecuteTerminate, DeviceError) as exception: 1065 return_message = exception.args 1066 1067 return return_message 1068 1069 def _init_jsunit_test(self): 1070 self.config.device.connector_command("target mount") 1071 self.config.device.execute_shell_command( 1072 "rm -rf %s" % self.config.target_test_path) 1073 self.config.device.execute_shell_command( 1074 "mkdir -p %s" % self.config.target_test_path) 1075 self.config.device.execute_shell_command( 1076 "mount -o rw,remount,rw /") 1077 1078 def _run_jsunit(self, suite_file, device_log_file): 1079 filename = os.path.basename(suite_file) 1080 _, suffix_name = os.path.splitext(filename) 1081 1082 resource_manager = ResourceManager() 1083 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 1084 if suffix_name == ".hap": 1085 json_file_path = suite_file.replace(".hap", ".json") 1086 if os.path.exists(json_file_path): 1087 timeout = self._get_json_shell_timeout(json_file_path) 1088 else: 1089 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 1090 else: 1091 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 1092 resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device) 1093 main_result = self._install_hap(suite_file) 1094 result = ResultManager(suite_file, self.config) 1095 if main_result: 1096 self._execute_hapfile_jsunittest() 1097 try: 1098 status = False 1099 actiontime = JS_TIMEOUT 1100 times = CYCLE_TIMES 1101 if timeout: 1102 actiontime = timeout 1103 times = 1 1104 with open(device_log_file, "r", encoding='utf-8', 1105 errors='ignore') as file_read_pipe: 1106 for i in range(0, times): 1107 if status: 1108 break 1109 else: 1110 time.sleep(float(actiontime)) 1111 start_time = int(time.time()) 1112 while True: 1113 data = file_read_pipe.readline() 1114 if data.lower().find(_ACE_LOG_MARKER) != -1 and data.find("[end] run suites end") != -1: 1115 LOG.info("execute testcase successfully.") 1116 status = True 1117 break 1118 if int(time.time()) - start_time > 5: 1119 break 1120 finally: 1121 _lock_screen(self.config.device) 1122 self._uninstall_hap(self.package_name) 1123 else: 1124 self.result = result.get_test_results("Error: install hap failed") 1125 LOG.error("Error: install hap failed") 1126 1127 resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device) 1128 1129 def _execute_hapfile_jsunittest(self): 1130 _unlock_screen(self.config.device) 1131 _unlock_device(self.config.device) 1132 1133 try: 1134 return_message = self.start_hap_execute() 1135 except (ExecuteTerminate, DeviceError) as exception: 1136 return_message = str(exception.args) 1137 1138 return return_message 1139 1140 def _install_hap(self, suite_file): 1141 message = self.config.device.connector_command("install %s" % suite_file) 1142 message = str(message).rstrip() 1143 if message == "" or "success" in message: 1144 return_code = True 1145 if message != "": 1146 LOG.info(message) 1147 else: 1148 return_code = False 1149 if message != "": 1150 LOG.warning(message) 1151 1152 _sleep_according_to_result(return_code) 1153 return return_code 1154 1155 def _uninstall_hap(self, package_name): 1156 return_message = self.config.device.execute_shell_command( 1157 "bm uninstall -n %s" % package_name) 1158 _sleep_according_to_result(return_message) 1159 return return_message 1160 1161 1162@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) 1163class OHRustTestDriver(IDriver): 1164 def __init__(self): 1165 self.result = "" 1166 self.error_message = "" 1167 self.config = None 1168 1169 def __check_environment__(self, device_options): 1170 pass 1171 1172 def __check_config__(self, config): 1173 pass 1174 1175 def __execute__(self, request): 1176 try: 1177 LOG.debug("Start to execute open harmony rust test") 1178 self.config = request.config 1179 self.config.device = request.config.environment.devices[0] 1180 self.config.target_test_path = DEFAULT_TEST_PATH 1181 suite_file = request.root.source.source_file 1182 LOG.debug("Testsuite filepath:{}".format(suite_file)) 1183 1184 if not suite_file: 1185 LOG.error("test source '{}' not exists".format( 1186 request.root.source.source_string)) 1187 return 1188 1189 result_save_path = get_result_savepath(suite_file, self.config.report_path) 1190 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 1191 self.config.device.set_device_report_path(request.config.report_path) 1192 self.config.device.device_log_collector.start_hilog_task() 1193 self._init_oh_rust() 1194 self._run_oh_rust(suite_file, request) 1195 except Exception as exception: 1196 self.error_message = exception 1197 if not getattr(exception, "error_no", ""): 1198 setattr(exception, "error_no", "03409") 1199 LOG.exception(self.error_message, exc_info=False, error_no="03409") 1200 finally: 1201 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 1202 time.time_ns()) 1203 log_tar_file_name = "{}_{}".format( 1204 request.get_module_name(), str(serial).replace(":", "_")) 1205 self.config.device.device_log_collector.stop_hilog_task( 1206 log_tar_file_name, module_name=request.get_module_name()) 1207 xml_path = os.path.join( 1208 request.config.report_path, "result", 1209 '.'.join((request.get_module_name(), "xml"))) 1210 shutil.move(xml_path, self.result) 1211 self.result = check_result_report( 1212 request.config.report_path, self.result, self.error_message) 1213 update_xml(request.root.source.source_file, self.result) 1214 1215 def __result__(self): 1216 return self.result if os.path.exists(self.result) else "" 1217 1218 def _init_oh_rust(self): 1219 self.config.device.connector_command("target mount") 1220 self.config.device.execute_shell_command( 1221 "rm -rf %s" % self.config.target_test_path) 1222 self.config.device.execute_shell_command( 1223 "mkdir -p %s" % self.config.target_test_path) 1224 self.config.device.execute_shell_command( 1225 "mount -o rw,remount,rw /") 1226 1227 def _run_oh_rust(self, suite_file, request=None): 1228 self.config.device.push_file(suite_file, self.config.target_test_path) 1229 self.config.device.execute_shell_command( 1230 "hilog -d %s" % (os.path.join(self.config.target_test_path, 1231 os.path.basename(suite_file))) 1232 ) 1233 resource_manager = ResourceManager() 1234 resource_data_dict, resource_dir = \ 1235 resource_manager.get_resource_data_dic(suite_file) 1236 resource_manager.process_preparer_data(resource_data_dict, 1237 resource_dir, 1238 self.config.device) 1239 for listener in request.listeners: 1240 listener.device_sn = self.config.device.device_sn 1241 1242 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust) 1243 if parsers: 1244 parsers = parsers[:1] 1245 parser_instances = [] 1246 for parser in parsers: 1247 parser_instance = parser.__class__() 1248 parser_instance.suite_name = request.get_module_name() 1249 parser_instance.listeners = request.listeners 1250 parser_instances.append(parser_instance) 1251 handler = ShellHandler(parser_instances) 1252 if self.config.coverage: 1253 command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format( 1254 self.config.target_test_path, os.path.basename(suite_file)) 1255 else: 1256 command = "cd {}; chmod +x *; ./{}".format( 1257 self.config.target_test_path, os.path.basename(suite_file)) 1258 self.config.device.execute_shell_command( 1259 command, timeout=TIME_OUT, receiver=handler, retry=0) 1260 if self.config.coverage: 1261 result = ResultManager(suite_file, self.config) 1262 result.obtain_coverage_data() 1263 resource_manager.process_cleaner_data(resource_data_dict, resource_dir, 1264 self.config.device) 1265