1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import re 22import shutil 23import subprocess 24import sys 25import time 26import platform 27import zipfile 28import stat 29import random 30from dataclasses import dataclass 31from json import JSONDecodeError 32 33from xdevice import DeviceTestType, check_result_report 34from xdevice import DeviceLabelType 35from xdevice import CommonParserType 36from xdevice import ExecuteTerminate 37from xdevice import DeviceError 38from xdevice import ShellHandler 39 40from xdevice import IDriver 41from xdevice import platform_logger 42from xdevice import Plugin 43from xdevice import get_plugin 44from ohos.environment.dmlib import process_command_ret 45from core.utils import get_decode 46from core.utils import get_fuzzer_path 47from core.config.resource_manager import ResourceManager 48from core.config.config_manager import FuzzerConfigManager 49 50__all__ = [ 51 "CppTestDriver", 52 "JSUnitTestDriver", 53 "disable_keyguard", 54 "GTestConst"] 55 56LOG = platform_logger("Drivers") 57DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test") 58OBJ = "obj" 59 60TIME_OUT = 900 * 1000 61JS_TIMEOUT = 10 62CYCLE_TIMES = 30 63 64FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL 65MODES = stat.S_IWUSR | stat.S_IRUSR 66 67 68############################################################################## 69############################################################################## 70 71class CollectingOutputReceiver: 72 def __init__(self): 73 self.output = "" 74 75 def __read__(self, output): 76 self.output = "%s%s" % (self.output, output) 77 78 def __error__(self, message): 79 pass 80 81 def __done__(self, result_code="", message=""): 82 pass 83 84 85class DisplayOutputReceiver: 86 def __init__(self): 87 self.output = "" 88 self.unfinished_line = "" 89 90 def _process_output(self, output, end_mark="\n"): 91 content = output 92 if self.unfinished_line: 93 content = "".join((self.unfinished_line, content)) 94 self.unfinished_line = "" 95 lines = content.split(end_mark) 96 if content.endswith(end_mark): 97 return lines[:-1] 98 else: 99 self.unfinished_line = lines[-1] 100 return lines[:-1] 101 102 def __read__(self, output): 103 self.output = "%s%s" % (self.output, output) 104 lines = self._process_output(output) 105 for line in lines: 106 line = line.strip() 107 if line: 108 LOG.info(get_decode(line)) 109 110 def __error__(self, message): 111 pass 112 113 def __done__(self, result_code="", message=""): 114 pass 115 116 117@dataclass 118class GTestConst(object): 119 exec_para_filter = "--gtest_filter" 120 exec_para_level = "--gtest_testsize" 121 exec_acts_para_filter = "--jstest_filter" 122 exec_acts_para_level = "--jstest_testsize" 123 124 125def get_device_log_file(report_path, serial=None, log_name="device_log"): 126 from xdevice import Variables 127 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 128 os.makedirs(log_path, exist_ok=True) 129 130 serial = serial or time.time_ns() 131 device_file_name = "{}_{}.log".format(log_name, serial) 132 device_log_file = os.path.join(log_path, device_file_name) 133 return device_log_file 134 135 136def get_level_para_string(level_string): 137 level_list = list(set(level_string.split(","))) 138 level_para_string = "" 139 for item in level_list: 140 if not item.isdigit(): 141 continue 142 item = item.strip(" ") 143 level_para_string = f"{level_para_string}Level{item}," 144 level_para_string = level_para_string.strip(",") 145 return level_para_string 146 147 148def get_result_savepath(testsuit_path, result_rootpath): 149 findkey = os.sep + "tests" + os.sep 150 filedir, _ = os.path.split(testsuit_path) 151 pos = filedir.find(findkey) 152 if -1 != pos: 153 subpath = filedir[pos + len(findkey):] 154 pos1 = subpath.find(os.sep) 155 if -1 != pos1: 156 subpath = subpath[pos1 + len(os.sep):] 157 result_path = os.path.join(result_rootpath, "result", subpath) 158 else: 159 result_path = os.path.join(result_rootpath, "result") 160 else: 161 result_path = os.path.join(result_rootpath, "result") 162 163 if not os.path.exists(result_path): 164 os.makedirs(result_path) 165 166 LOG.info("result_savepath = " + result_path) 167 return result_path 168 169 170def get_test_log_savepath(result_rootpath): 171 test_log_path = os.path.join(result_rootpath, "log", "test_log") 172 173 if not os.path.exists(test_log_path): 174 os.makedirs(test_log_path) 175 176 LOG.info("test_log_savepath = {}".format(test_log_path)) 177 return test_log_path 178 179 180# all testsuit common Unavailable test result xml 181def _create_empty_result_file(filepath, filename, error_message): 182 error_message = str(error_message) 183 error_message = error_message.replace("\"", "") 184 error_message = error_message.replace("<", "") 185 error_message = error_message.replace(">", "") 186 error_message = error_message.replace("&", "") 187 if filename.endswith(".hap"): 188 filename = filename.split(".")[0] 189 if not os.path.exists(filepath): 190 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 191 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 192 time.localtime()) 193 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 194 file_desc.write( 195 '<testsuites tests="0" failures="0" ' 196 'disabled="0" errors="0" timestamp="%s" ' 197 'time="0" name="AllTests">\n' % time_stamp) 198 file_desc.write( 199 ' <testsuite name="%s" tests="0" failures="0" ' 200 'disabled="0" errors="0" time="0.0" ' 201 'unavailable="1" message="%s">\n' % 202 (filename, error_message)) 203 file_desc.write(' </testsuite>\n') 204 file_desc.write('</testsuites>\n') 205 return 206 207 208def _unlock_screen(device): 209 device.execute_shell_command("svc power stayon true") 210 time.sleep(1) 211 212 213def _unlock_device(device): 214 device.execute_shell_command("input keyevent 82") 215 time.sleep(1) 216 device.execute_shell_command("wm dismiss-keyguard") 217 time.sleep(1) 218 219 220def _lock_screen(device): 221 device.execute_shell_command("svc power stayon false") 222 time.sleep(1) 223 224 225def disable_keyguard(device): 226 _unlock_screen(device) 227 _unlock_device(device) 228 229 230def _sleep_according_to_result(result): 231 if result: 232 time.sleep(1) 233 234 235def _create_fuzz_crash_file(filepath, filename): 236 if not os.path.exists(filepath): 237 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 238 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 239 time.localtime()) 240 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 241 file_desc.write( 242 '<testsuites disabled="0" name="AllTests" ' 243 'time="300" timestamp="%s" errors="0" ' 244 'failures="1" tests="1">\n' % time_stamp) 245 file_desc.write( 246 ' <testsuite disabled="0" name="%s" time="300" ' 247 'errors="0" failures="1" tests="1">\n' % filename) 248 file_desc.write( 249 ' <testcase name="%s" time="300" classname="%s" ' 250 'status="run">\n' % (filename, filename)) 251 file_desc.write( 252 ' <failure type="" ' 253 'message="Fuzzer crash. See ERROR in log file">\n') 254 file_desc.write(' </failure>\n') 255 file_desc.write(' </testcase>\n') 256 file_desc.write(' </testsuite>\n') 257 file_desc.write('</testsuites>\n') 258 return 259 260 261def _create_fuzz_pass_file(filepath, filename): 262 if not os.path.exists(filepath): 263 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 264 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 265 time.localtime()) 266 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 267 file_desc.write( 268 '<testsuites disabled="0" name="AllTests" ' 269 'time="300" timestamp="%s" errors="0" ' 270 'failures="0" tests="1">\n' % time_stamp) 271 file_desc.write( 272 ' <testsuite disabled="0" name="%s" time="300" ' 273 'errors="0" failures="0" tests="1">\n' % filename) 274 file_desc.write( 275 ' <testcase name="%s" time="300" classname="%s" ' 276 'status="run"/>\n' % (filename, filename)) 277 file_desc.write(' </testsuite>\n') 278 file_desc.write('</testsuites>\n') 279 return 280 281 282def _create_fuzz_result_file(filepath, filename, error_message): 283 error_message = str(error_message) 284 error_message = error_message.replace("\"", "") 285 error_message = error_message.replace("<", "") 286 error_message = error_message.replace(">", "") 287 error_message = error_message.replace("&", "") 288 if "AddressSanitizer" in error_message: 289 LOG.error("FUZZ TEST CRASH") 290 _create_fuzz_crash_file(filepath, filename) 291 elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second', 292 error_message, re.M) is not None: 293 LOG.info("FUZZ TEST PASS") 294 _create_fuzz_pass_file(filepath, filename) 295 else: 296 LOG.error("FUZZ TEST UNAVAILABLE") 297 _create_empty_result_file(filepath, filename, error_message) 298 return 299 300 301############################################################################## 302############################################################################## 303 304class ResultManager(object): 305 def __init__(self, testsuit_path, config): 306 self.testsuite_path = testsuit_path 307 self.config = config 308 self.result_rootpath = self.config.report_path 309 self.device = self.config.device 310 if testsuit_path.endswith(".hap"): 311 self.device_testpath = self.config.test_hap_out_path 312 else: 313 self.device_testpath = self.config.target_test_path 314 self.testsuite_name = os.path.basename(self.testsuite_path) 315 self.is_coverage = False 316 317 def set_is_coverage(self, is_coverage): 318 self.is_coverage = is_coverage 319 320 def get_test_results(self, error_message=""): 321 # Get test result files 322 filepath, _ = self.obtain_test_result_file() 323 if "fuzztest" == self.config.testtype[0]: 324 LOG.info("create fuzz test report") 325 _create_fuzz_result_file(filepath, self.testsuite_name, 326 error_message) 327 if not self.is_coverage: 328 self._obtain_fuzz_corpus() 329 330 if not os.path.exists(filepath): 331 _create_empty_result_file(filepath, self.testsuite_name, 332 error_message) 333 if "benchmark" == self.config.testtype[0]: 334 self._obtain_benchmark_result() 335 # Get coverage data files 336 if self.is_coverage: 337 self.obtain_coverage_data() 338 339 return filepath 340 341 def get_test_results_hidelog(self, error_message=""): 342 # Get test result files 343 result_file_path, test_log_path = self.obtain_test_result_file() 344 log_content = "" 345 if not error_message: 346 if os.path.exists(test_log_path): 347 with open(test_log_path, "r") as log: 348 log_content = log.readlines() 349 else: 350 LOG.error("{}: Test log not exist.".format(test_log_path)) 351 else: 352 log_content = error_message 353 354 if "fuzztest" == self.config.testtype[0]: 355 LOG.info("create fuzz test report") 356 _create_fuzz_result_file(result_file_path, self.testsuite_name, 357 log_content) 358 if not self.is_coverage: 359 self._obtain_fuzz_corpus() 360 361 if not os.path.exists(result_file_path): 362 _create_empty_result_file(result_file_path, self.testsuite_name, 363 log_content) 364 if "benchmark" == self.config.testtype[0]: 365 self._obtain_benchmark_result() 366 # Get coverage data files 367 if self.is_coverage: 368 self.obtain_coverage_data() 369 370 return result_file_path 371 372 def _obtain_fuzz_corpus(self): 373 command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;" 374 self.config.device.execute_shell_command(command) 375 result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath) 376 LOG.info(f"fuzz_dir = {result_save_path}") 377 self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path) 378 379 def _obtain_benchmark_result(self): 380 benchmark_root_dir = os.path.abspath( 381 os.path.join(self.result_rootpath, "benchmark")) 382 benchmark_dir = os.path.abspath( 383 os.path.join(benchmark_root_dir, 384 self.get_result_sub_save_path(), 385 self.testsuite_name)) 386 387 if not os.path.exists(benchmark_dir): 388 os.makedirs(benchmark_dir) 389 390 LOG.info("benchmark_dir = %s" % benchmark_dir) 391 self.device.pull_file(os.path.join(self.device_testpath, 392 "%s.json" % self.testsuite_name), benchmark_dir) 393 if not os.path.exists(os.path.join(benchmark_dir, 394 "%s.json" % self.testsuite_name)): 395 os.rmdir(benchmark_dir) 396 return benchmark_dir 397 398 def get_result_sub_save_path(self): 399 find_key = os.sep + "benchmark" + os.sep 400 file_dir, _ = os.path.split(self.testsuite_path) 401 pos = file_dir.find(find_key) 402 subpath = "" 403 if -1 != pos: 404 subpath = file_dir[pos + len(find_key):] 405 LOG.info("subpath = " + subpath) 406 return subpath 407 408 def obtain_test_result_file(self): 409 result_save_path = get_result_savepath(self.testsuite_path, 410 self.result_rootpath) 411 412 result_file_path = os.path.join(result_save_path, 413 "%s.xml" % self.testsuite_name) 414 415 result_josn_file_path = os.path.join(result_save_path, 416 "%s.json" % self.testsuite_name) 417 418 if self.testsuite_path.endswith('.hap'): 419 remote_result_file = os.path.join(self.device_testpath, 420 "testcase_result.xml") 421 remote_json_result_file = os.path.join(self.device_testpath, 422 "%s.json" % self.testsuite_name) 423 else: 424 remote_result_file = os.path.join(self.device_testpath, 425 "%s.xml" % self.testsuite_name) 426 remote_json_result_file = os.path.join(self.device_testpath, 427 "%s.json" % self.testsuite_name) 428 429 if self.config.testtype[0] != "fuzztest": 430 if self.device.is_file_exist(remote_result_file): 431 self.device.pull_file(remote_result_file, result_file_path) 432 elif self.device.is_file_exist(remote_json_result_file): 433 self.device.pull_file(remote_json_result_file, 434 result_josn_file_path) 435 result_file_path = result_josn_file_path 436 else: 437 LOG.info("%s not exist", remote_result_file) 438 439 if self.config.hidelog: 440 remote_log_result_file = os.path.join(self.device_testpath, 441 "%s.log" % self.testsuite_name) 442 test_log_save_path = get_test_log_savepath(self.result_rootpath) 443 test_log_file_path = os.path.join(test_log_save_path, 444 "%s.log" % self.testsuite_name) 445 self.device.pull_file(remote_log_result_file, test_log_file_path) 446 return result_file_path, test_log_file_path 447 448 return result_file_path, "" 449 450 def make_empty_result_file(self, error_message=""): 451 result_savepath = get_result_savepath(self.testsuite_path, 452 self.result_rootpath) 453 result_filepath = os.path.join(result_savepath, "%s.xml" % 454 self.testsuite_name) 455 if not os.path.exists(result_filepath): 456 _create_empty_result_file(result_filepath, 457 self.testsuite_name, error_message) 458 459 def is_exist_target_in_device(self, path, target): 460 if platform.system() == "Windows": 461 command = '\"ls -l %s | grep %s\"' % (path, target) 462 else: 463 command = "ls -l %s | grep %s" % (path, target) 464 465 check_result = False 466 stdout_info = self.device.execute_shell_command(command) 467 if stdout_info != "" and stdout_info.find(target) != -1: 468 check_result = True 469 return check_result 470 471 def obtain_coverage_data(self): 472 cov_root_dir = os.path.abspath(os.path.join( 473 self.result_rootpath, 474 "..", 475 "coverage", 476 "data", 477 "exec")) 478 479 480 tests_path = self.config.testcases_path 481 test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0] 482 cxx_cov_path = os.path.abspath(os.path.join( 483 self.result_rootpath, 484 "..", 485 "coverage", 486 "data", 487 "cxx", 488 self.testsuite_name + '_' + test_type)) 489 490 if os.path.basename(self.testsuite_name).startswith("rust_"): 491 target_name = "lib.unstripped" 492 else: 493 target_name = OBJ 494 if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name): 495 if not os.path.exists(cxx_cov_path): 496 os.makedirs(cxx_cov_path) 497 self.config.device.execute_shell_command( 498 "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name)) 499 src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name) 500 self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT) 501 tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name) 502 if platform.system() == "Windows": 503 process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True) 504 process.communicate() 505 os.remove(tar_path) 506 os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ)) 507 else: 508 subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" % 509 (tar_path, cxx_cov_path), shell=True).communicate() 510 subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate() 511 if target_name != OBJ: 512 subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name), 513 os.path.join(cxx_cov_path, OBJ)), shell=True).communicate() 514 515 516############################################################################## 517############################################################################## 518 519@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test) 520class CppTestDriver(IDriver): 521 """ 522 CppTest is a Test that runs a native test package on given device. 523 """ 524 # test driver config 525 config = None 526 result = "" 527 528 def __check_environment__(self, device_options): 529 if len(device_options) == 1 and device_options[0].label is None: 530 return True 531 if len(device_options) != 1 or \ 532 device_options[0].label != DeviceLabelType.phone: 533 return False 534 return True 535 536 def __check_config__(self, config): 537 pass 538 539 def __result__(self): 540 return self.result if os.path.exists(self.result) else "" 541 542 def __execute__(self, request): 543 try: 544 self.config = request.config 545 self.config.target_test_path = DEFAULT_TEST_PATH 546 self.config.device = request.config.environment.devices[0] 547 548 suite_file = request.root.source.source_file 549 LOG.debug("Testsuite FilePath: %s" % suite_file) 550 551 if not suite_file: 552 LOG.error("test source '%s' not exists" % 553 request.root.source.source_string) 554 return 555 556 if not self.config.device: 557 result = ResultManager(suite_file, self.config) 558 result.set_is_coverage(False) 559 result.make_empty_result_file( 560 "No test device is found. ") 561 return 562 self.config.device.set_device_report_path(request.config.report_path) 563 self.config.device.device_log_collector.start_hilog_task() 564 self._init_gtest() 565 self._run_gtest(suite_file) 566 567 finally: 568 serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns()) 569 log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace( 570 ":", "_")) 571 self.config.device.device_log_collector.stop_hilog_task(log_tar_file_name) 572 573 def _init_gtest(self): 574 self.config.device.connector_command("target mount") 575 self.config.device.execute_shell_command( 576 "rm -rf %s" % self.config.target_test_path) 577 self.config.device.execute_shell_command( 578 "mkdir -p %s" % self.config.target_test_path) 579 self.config.device.execute_shell_command( 580 "mount -o rw,remount,rw /") 581 if "fuzztest" == self.config.testtype[0]: 582 self.config.device.execute_shell_command( 583 "mkdir -p %s" % os.path.join(self.config.target_test_path, 584 "corpus")) 585 586 def _gtest_command(self, suite_file): 587 filename = os.path.basename(suite_file) 588 test_para = self._get_test_para(self.config.testcase, 589 self.config.testlevel, 590 self.config.testtype, 591 self.config.target_test_path, 592 suite_file, 593 filename) 594 595 # execute testcase 596 if not self.config.coverage: 597 if self.config.random == "random": 598 seed = random.randint(1, 100) 599 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % ( 600 self.config.target_test_path, 601 filename, 602 filename, 603 test_para, 604 seed) 605 else: 606 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % ( 607 self.config.target_test_path, 608 filename, 609 filename, 610 test_para) 611 else: 612 coverage_outpath = self.config.coverage_outpath 613 if coverage_outpath: 614 strip_num = len(coverage_outpath.strip("/").split("/")) 615 else: 616 ohos_config_path = os.path.join(sys.source_code_root_path, "ohos_config.json") 617 with open(ohos_config_path, 'r') as json_file: 618 json_info = json.load(json_file) 619 out_path = json_info.get("out_path") 620 strip_num = len(out_path.strip("/").split("/")) 621 if "fuzztest" == self.config.testtype[0]: 622 self._push_corpus_cov_if_exist(suite_file) 623 command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \ 624 rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX=.; \ 625 GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}" 626 else: 627 command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \ 628 "GCOV_PREFIX_STRIP=%s ./%s %s" % \ 629 (self.config.target_test_path, 630 filename, 631 str(strip_num), 632 filename, 633 test_para) 634 635 if self.config.hidelog: 636 command += " > {}.log 2>&1".format(filename) 637 638 return command 639 640 def _run_gtest(self, suite_file): 641 from xdevice import Variables 642 is_coverage_test = True if self.config.coverage else False 643 644 # push testsuite file 645 self.config.device.push_file(suite_file, self.config.target_test_path) 646 self._push_corpus_if_exist(suite_file) 647 648 # push resource files 649 resource_manager = ResourceManager() 650 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 651 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 652 self.config.device) 653 654 command = self._gtest_command(suite_file) 655 656 result = ResultManager(suite_file, self.config) 657 result.set_is_coverage(is_coverage_test) 658 659 try: 660 # get result 661 if self.config.hidelog: 662 return_message = "" 663 display_receiver = CollectingOutputReceiver() 664 self.config.device.execute_shell_command( 665 command, 666 receiver=display_receiver, 667 timeout=TIME_OUT, 668 retry=0) 669 else: 670 display_receiver = DisplayOutputReceiver() 671 self.config.device.execute_shell_command( 672 command, 673 receiver=display_receiver, 674 timeout=TIME_OUT, 675 retry=0) 676 return_message = display_receiver.output 677 except (ExecuteTerminate, DeviceError) as exception: 678 return_message = str(exception.args) 679 680 if self.config.hidelog: 681 self.result = result.get_test_results_hidelog(return_message) 682 else: 683 self.result = result.get_test_results(return_message) 684 685 resource_manager.process_cleaner_data(resource_data_dic, 686 resource_dir, 687 self.config.device) 688 689 @staticmethod 690 def _alter_init(name): 691 with open(name, "rb") as f: 692 lines = f.read() 693 str_content = lines.decode("utf-8") 694 695 pattern_sharp = '^\s*#.*$(\n|\r\n)' 696 pattern_star = '/\*.*?\*/(\n|\r\n)+' 697 pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+' 698 699 if re.match(pattern_sharp, str_content, flags=re.M): 700 striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M) 701 elif re.findall(pattern_star, str_content, flags=re.S): 702 striped_content = re.sub(pattern_star, '', str_content, flags=re.S) 703 elif re.findall(pattern_xml, str_content, flags=re.S): 704 striped_content = re.sub(pattern_xml, '', str_content, flags=re.S) 705 else: 706 striped_content = str_content 707 708 striped_bt = striped_content.encode("utf-8") 709 if os.path.exists(name): 710 os.remove(name) 711 with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f: 712 f.write(striped_bt) 713 714 def _push_corpus_cov_if_exist(self, suite_file): 715 cov_file = suite_file + "_corpus.tar.gz" 716 LOG.info("corpus_cov file :%s" % str(cov_file)) 717 self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path)) 718 719 def _push_corpus_if_exist(self, suite_file): 720 if "fuzztest" == self.config.testtype[0]: 721 corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus") 722 if not os.path.isdir(corpus_path): 723 return 724 725 corpus_dirs = [] 726 corpus_file_list = [] 727 728 for root, _, files in os.walk(corpus_path): 729 if not files: 730 continue 731 732 corpus_dir = root.split("corpus")[-1] 733 if corpus_dir != "": 734 corpus_dirs.append(corpus_dir) 735 736 for file in files: 737 cp_file = os.path.normcase(os.path.join(root, file)) 738 corpus_file_list.append(cp_file) 739 if file == "init": 740 self._alter_init(cp_file) 741 742 # mkdir corpus files dir 743 if corpus_dirs: 744 for corpus in corpus_dirs: 745 mkdir_corpus_command = f"shell; mkdir -p {corpus}" 746 self.config.device.connector_command(mkdir_corpus_command) 747 748 # push corpus file 749 if corpus_file_list: 750 for corpus_file in corpus_file_list: 751 self.config.device.push_file(corpus_file, 752 os.path.join(self.config.target_test_path, "corpus")) 753 754 def _get_test_para(self, 755 testcase, 756 testlevel, 757 testtype, 758 target_test_path, 759 suite_file, 760 filename): 761 if "benchmark" == testtype[0]: 762 test_para = (" --benchmark_out_format=json" 763 " --benchmark_out=%s%s.json") % ( 764 target_test_path, filename) 765 return test_para 766 767 if "" != testcase and "" == testlevel: 768 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 769 elif "" == testcase and "" != testlevel: 770 level_para = get_level_para_string(testlevel) 771 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 772 else: 773 test_para = "" 774 775 if "fuzztest" == testtype[0]: 776 cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path( 777 suite_file), "project.xml")).get_fuzzer_config("fuzztest") 778 LOG.info("config list :%s" % str(cfg_list)) 779 if self.config.coverage: 780 test_para += "corpus -runs=0" + \ 781 " -max_len=" + cfg_list[0] + \ 782 " -max_total_time=" + cfg_list[1] + \ 783 " -rss_limit_mb=" + cfg_list[2] 784 else: 785 test_para += "corpus -max_len=" + cfg_list[0] + \ 786 " -max_total_time=" + cfg_list[1] + \ 787 " -rss_limit_mb=" + cfg_list[2] 788 789 return test_para 790 791 792############################################################################## 793############################################################################## 794 795@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test) 796class JSUnitTestDriver(IDriver): 797 """ 798 JSUnitTestDriver is a Test that runs a native test package on given device. 799 """ 800 801 def __init__(self): 802 self.config = None 803 self.result = "" 804 self.start_time = None 805 self.ability_name = "" 806 self.package_name = "" 807 # log 808 self.hilog = None 809 self.hilog_proc = None 810 811 def __check_environment__(self, device_options): 812 pass 813 814 def __check_config__(self, config): 815 pass 816 817 def __result__(self): 818 return self.result if os.path.exists(self.result) else "" 819 820 def __execute__(self, request): 821 try: 822 LOG.info("developertest driver") 823 self.config = request.config 824 self.config.target_test_path = DEFAULT_TEST_PATH 825 self.config.device = request.config.environment.devices[0] 826 827 suite_file = request.root.source.source_file 828 result_save_path = get_result_savepath(suite_file, self.config.report_path) 829 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 830 if not suite_file: 831 LOG.error("test source '%s' not exists" % 832 request.root.source.source_string) 833 return 834 835 if not self.config.device: 836 result = ResultManager(suite_file, self.config) 837 result.set_is_coverage(False) 838 result.make_empty_result_file( 839 "No test device is found") 840 return 841 842 package_name, ability_name = self._get_package_and_ability_name( 843 suite_file) 844 self.package_name = package_name 845 self.ability_name = ability_name 846 self.config.test_hap_out_path = \ 847 "/data/data/%s/files/" % self.package_name 848 self.config.device.connector_command("shell hilog -r") 849 850 self.hilog = get_device_log_file( 851 request.config.report_path, 852 request.config.device.__get_serial__() + "_" + request. 853 get_module_name(), 854 "device_hilog") 855 856 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 857 0o755) 858 859 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 860 self.config.device.device_log_collector.add_log_address(None, self.hilog) 861 _, self.hilog_proc = self.config.device.device_log_collector.\ 862 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 863 self._init_jsunit_test() 864 self._run_jsunit(suite_file, self.hilog) 865 hilog_file_pipe.flush() 866 self.generate_console_output(self.hilog, request) 867 xml_path = os.path.join( 868 request.config.report_path, "result", 869 '.'.join((request.get_module_name(), "xml"))) 870 shutil.move(xml_path, self.result) 871 finally: 872 self.config.device.device_log_collector.remove_log_address(None, self.hilog) 873 self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) 874 875 def _init_jsunit_test(self): 876 self.config.device.connector_command("target mount") 877 self.config.device.execute_shell_command( 878 "rm -rf %s" % self.config.target_test_path) 879 self.config.device.execute_shell_command( 880 "mkdir -p %s" % self.config.target_test_path) 881 self.config.device.execute_shell_command( 882 "mount -o rw,remount,rw /") 883 884 def _run_jsunit(self, suite_file, device_log_file): 885 filename = os.path.basename(suite_file) 886 _, suffix_name = os.path.splitext(filename) 887 888 resource_manager = ResourceManager() 889 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 890 if suffix_name == ".hap": 891 json_file_path = suite_file.replace(".hap", ".json") 892 if os.path.exists(json_file_path): 893 timeout = self._get_json_shell_timeout(json_file_path) 894 else: 895 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 896 else: 897 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 898 resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device) 899 main_result = self._install_hap(suite_file) 900 result = ResultManager(suite_file, self.config) 901 if main_result: 902 self._execute_hapfile_jsunittest() 903 try: 904 status = False 905 actiontime = JS_TIMEOUT 906 times = CYCLE_TIMES 907 if timeout: 908 actiontime = timeout 909 times = 1 910 device_log_file_open = os.open(device_log_file, os.O_RDONLY, stat.S_IWUSR | stat.S_IRUSR) 911 with os.fdopen(device_log_file_open, "r", encoding='utf-8') \ 912 as file_read_pipe: 913 for i in range(0, times): 914 if status: 915 break 916 else: 917 time.sleep(float(actiontime)) 918 start_time = int(time.time()) 919 while True: 920 data = file_read_pipe.readline() 921 if data.find("JSApp:") != -1 and data.find("[end] run suites end") != -1: 922 LOG.info("execute testcase successfully.") 923 status = True 924 break 925 if int(time.time()) - start_time > 5: 926 break 927 finally: 928 _lock_screen(self.config.device) 929 self._uninstall_hap(self.package_name) 930 else: 931 self.result = result.get_test_results("Error: install hap failed") 932 LOG.error("Error: install hap failed") 933 934 resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device) 935 936 def generate_console_output(self, device_log_file, request): 937 result_message = self.read_device_log(device_log_file) 938 939 report_name = request.get_module_name() 940 parsers = get_plugin( 941 Plugin.PARSER, CommonParserType.jsunit) 942 if parsers: 943 parsers = parsers[:1] 944 for listener in request.listeners: 945 listener.device_sn = self.config.device.device_sn 946 parser_instances = [] 947 948 for parser in parsers: 949 parser_instance = parser.__class__() 950 parser_instance.suites_name = report_name 951 parser_instance.suite_name = report_name 952 parser_instance.listeners = request.listeners 953 parser_instances.append(parser_instance) 954 handler = ShellHandler(parser_instances) 955 process_command_ret(result_message, handler) 956 957 def read_device_log(self, device_log_file): 958 device_log_file_open = os.open(device_log_file, os.O_RDONLY, 959 stat.S_IWUSR | stat.S_IRUSR) 960 961 result_message = "" 962 with os.fdopen(device_log_file_open, "r", encoding='utf-8') \ 963 as file_read_pipe: 964 while True: 965 data = file_read_pipe.readline() 966 if not data: 967 break 968 # only filter JSApp log 969 if data.find("JSApp:") != -1: 970 result_message += data 971 if data.find("[end] run suites end") != -1: 972 break 973 return result_message 974 975 def _execute_hapfile_jsunittest(self): 976 _unlock_screen(self.config.device) 977 _unlock_device(self.config.device) 978 979 try: 980 return_message = self.start_hap_execute() 981 except (ExecuteTerminate, DeviceError) as exception: 982 return_message = str(exception.args) 983 984 return return_message 985 986 def _install_hap(self, suite_file): 987 message = self.config.device.connector_command("install %s" % suite_file) 988 message = str(message).rstrip() 989 if message == "" or "success" in message: 990 return_code = True 991 if message != "": 992 LOG.info(message) 993 else: 994 return_code = False 995 if message != "": 996 LOG.warning(message) 997 998 _sleep_according_to_result(return_code) 999 return return_code 1000 1001 def start_hap_execute(self): 1002 try: 1003 command = "aa start -d 123 -a %s.MainAbility -b %s" \ 1004 % (self.package_name, self.package_name) 1005 self.start_time = time.time() 1006 result_value = self.config.device.execute_shell_command( 1007 command, timeout=TIME_OUT) 1008 1009 if "success" in str(result_value).lower(): 1010 LOG.info("execute %s's testcase success. result value=%s" 1011 % (self.package_name, result_value)) 1012 else: 1013 LOG.info("execute %s's testcase failed. result value=%s" 1014 % (self.package_name, result_value)) 1015 1016 _sleep_according_to_result(result_value) 1017 return_message = result_value 1018 except (ExecuteTerminate, DeviceError) as exception: 1019 return_message = exception.args 1020 1021 return return_message 1022 1023 def _uninstall_hap(self, package_name): 1024 return_message = self.config.device.execute_shell_command( 1025 "bm uninstall -n %s" % package_name) 1026 _sleep_according_to_result(return_message) 1027 return return_message 1028 1029 @staticmethod 1030 def _get_acts_test_para(testcase, 1031 testlevel, 1032 testtype, 1033 target_test_path, 1034 suite_file, 1035 filename): 1036 if "actstest" == testtype[0]: 1037 test_para = (" --actstest_out_format=json" 1038 " --actstest_out=%s%s.json") % ( 1039 target_test_path, filename) 1040 return test_para 1041 1042 if "" != testcase and "" == testlevel: 1043 test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase) 1044 elif "" == testcase and "" != testlevel: 1045 level_para = get_level_para_string(testlevel) 1046 test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para) 1047 else: 1048 test_para = "" 1049 return test_para 1050 1051 @staticmethod 1052 def _get_hats_test_para(testcase, 1053 testlevel, 1054 testtype, 1055 target_test_path, 1056 suite_file, 1057 filename): 1058 if "hatstest" == testtype[0]: 1059 test_hats_para = (" --hatstest_out_format=json" 1060 " --hatstest_out=%s%s.json") % ( 1061 target_test_path, filename) 1062 return test_hats_para 1063 1064 if "" != testcase and "" == testlevel: 1065 test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 1066 elif "" == testcase and "" != testlevel: 1067 level_para = get_level_para_string(testlevel) 1068 test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 1069 else: 1070 test_hats_para = "" 1071 return test_hats_para 1072 1073 @classmethod 1074 def _get_json_shell_timeout(cls, json_filepath): 1075 test_timeout = 300 1076 try: 1077 with open(json_filepath, 'r') as json_file: 1078 data_dic = json.load(json_file) 1079 if not data_dic: 1080 return test_timeout 1081 else: 1082 if "driver" in data_dic.keys(): 1083 driver_dict = data_dic.get("driver") 1084 if driver_dict and "test-timeout" in driver_dict.keys(): 1085 test_timeout = int(driver_dict["shell-timeout"]) / 1000 1086 return test_timeout 1087 except JSONDecodeError: 1088 return test_timeout 1089 finally: 1090 print(" get json shell timeout finally") 1091 1092 @staticmethod 1093 def _get_package_and_ability_name(hap_filepath): 1094 package_name = "" 1095 ability_name = "" 1096 if os.path.exists(hap_filepath): 1097 filename = os.path.basename(hap_filepath) 1098 1099 # unzip the hap file 1100 hap_bak_path = os.path.abspath(os.path.join( 1101 os.path.dirname(hap_filepath), 1102 "%s.bak" % filename)) 1103 zf_desc = zipfile.ZipFile(hap_filepath) 1104 try: 1105 zf_desc.extractall(path=hap_bak_path) 1106 except RuntimeError as error: 1107 print("Unzip error:", hap_bak_path) 1108 zf_desc.close() 1109 1110 # verify config.json file 1111 app_profile_path = os.path.join(hap_bak_path, "config.json") 1112 if not os.path.exists(app_profile_path): 1113 print("file %s not exist" % app_profile_path) 1114 return package_name, ability_name 1115 1116 if os.path.isdir(app_profile_path): 1117 print("%s is a folder, and not a file" % app_profile_path) 1118 return package_name, ability_name 1119 1120 # get package_name and ability_name value 1121 load_dict = {} 1122 with open(app_profile_path, 'r') as load_f: 1123 load_dict = json.load(load_f) 1124 profile_list = load_dict.values() 1125 for profile in profile_list: 1126 package_name = profile.get("package") 1127 if not package_name: 1128 continue 1129 abilities = profile.get("abilities") 1130 for abilitie in abilities: 1131 abilities_name = abilitie.get("name") 1132 if abilities_name.startswith("."): 1133 ability_name = package_name + abilities_name[ 1134 abilities_name.find("."):] 1135 else: 1136 ability_name = abilities_name 1137 break 1138 break 1139 1140 # delete hap_bak_path 1141 if os.path.exists(hap_bak_path): 1142 shutil.rmtree(hap_bak_path) 1143 else: 1144 print("file %s not exist" % hap_filepath) 1145 return package_name, ability_name 1146 1147 1148@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) 1149class OHRustTestDriver(IDriver): 1150 def __init__(self): 1151 self.result = "" 1152 self.error_message = "" 1153 self.config = None 1154 1155 def __check_environment__(self, device_options): 1156 pass 1157 1158 def __check_config__(self, config): 1159 pass 1160 1161 def __execute__(self, request): 1162 try: 1163 LOG.debug("Start to execute open harmony rust test") 1164 self.config = request.config 1165 self.config.device = request.config.environment.devices[0] 1166 self.config.target_test_path = DEFAULT_TEST_PATH 1167 1168 suite_file = request.root.source.source_file 1169 LOG.debug("Testsuite filepath:{}".format(suite_file)) 1170 1171 if not suite_file: 1172 LOG.error("test source '{}' not exists".format( 1173 request.root.source.source_string)) 1174 return 1175 1176 self.result = "{}.xml".format( 1177 os.path.join(request.config.report_path, 1178 "result", request.get_module_name())) 1179 self.config.device.set_device_report_path(request.config.report_path) 1180 self.config.device.device_log_collector.start_hilog_task() 1181 self._init_oh_rust() 1182 self._run_oh_rust(suite_file, request) 1183 except Exception as exception: 1184 self.error_message = exception 1185 if not getattr(exception, "error_no", ""): 1186 setattr(exception, "error_no", "03409") 1187 LOG.exception(self.error_message, exc_info=False, error_no="03409") 1188 finally: 1189 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 1190 time.time_ns()) 1191 log_tar_file_name = "{}_{}".format( 1192 request.get_module_name(), str(serial).replace(":", "_")) 1193 self.config.device.device_log_collector.stop_hilog_task( 1194 log_tar_file_name) 1195 self.result = check_result_report( 1196 request.config.report_path, self.result, self.error_message) 1197 1198 def _init_oh_rust(self): 1199 self.config.device.connector_command("target mount") 1200 self.config.device.execute_shell_command( 1201 "mount -o rw,remount,rw /") 1202 1203 def _run_oh_rust(self, suite_file, request=None): 1204 self.config.device.push_file(suite_file, self.config.target_test_path) 1205 resource_manager = ResourceManager() 1206 resource_data_dict, resource_dir = \ 1207 resource_manager.get_resource_data_dic(suite_file) 1208 resource_manager.process_preparer_data(resource_data_dict, 1209 resource_dir, 1210 self.config.device) 1211 for listener in request.listeners: 1212 listener.device_sn = self.config.device.device_sn 1213 1214 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust) 1215 if parsers: 1216 parsers = parsers[:1] 1217 parser_instances = [] 1218 for parser in parsers: 1219 parser_instance = parser.__class__() 1220 parser_instance.suite_name = request.get_module_name() 1221 parser_instance.listeners = request.listeners 1222 parser_instances.append(parser_instance) 1223 handler = ShellHandler(parser_instances) 1224 if self.config.coverage: 1225 command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format( 1226 self.config.target_test_path, os.path.basename(suite_file)) 1227 else: 1228 command = "cd {}; chmod +x *; ./{}".format( 1229 self.config.target_test_path, os.path.basename(suite_file)) 1230 self.config.device.execute_shell_command( 1231 command, timeout=TIME_OUT, receiver=handler, retry=0) 1232 if self.config.coverage: 1233 result = ResultManager(suite_file, self.config) 1234 result.obtain_coverage_data() 1235 resource_manager.process_cleaner_data(resource_data_dict, resource_dir, 1236 self.config.device) 1237 1238 def __result__(self): 1239 return self.result if os.path.exists(self.result) else "" 1240