• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21import re
22import shutil
23import subprocess
24import sys
25import time
26import platform
27import zipfile
28import stat
29import random
30import xml.etree.ElementTree as ET
31from dataclasses import dataclass
32from json import JSONDecodeError
33
34from xdevice import DeviceTestType, check_result_report
35from xdevice import DeviceLabelType
36from xdevice import CommonParserType
37from xdevice import ExecuteTerminate
38from xdevice import DeviceError
39from xdevice import ShellHandler
40
41from xdevice import IDriver
42from xdevice import platform_logger
43from xdevice import Plugin
44from xdevice import get_plugin
45from ohos.environment.dmlib import process_command_ret
46from core.utils import get_decode
47from core.utils import get_fuzzer_path
48from core.config.resource_manager import ResourceManager
49from core.config.config_manager import FuzzerConfigManager
50
51__all__ = [
52    "CppTestDriver",
53    "JSUnitTestDriver",
54    "disable_keyguard",
55    "GTestConst"]
56
57LOG = platform_logger("Drivers")
58DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
59OBJ = "obj"
60_ACE_LOG_MARKER = " a0c0d0"
61TIME_OUT = 900 * 1000
62JS_TIMEOUT = 10
63CYCLE_TIMES = 50
64
65FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL
66MODES = stat.S_IWUSR | stat.S_IRUSR
67
68
69class CollectingOutputReceiver:
70    def __init__(self):
71        self.output = ""
72
73    def __read__(self, output):
74        self.output = "%s%s" % (self.output, output)
75
76    def __error__(self, message):
77        pass
78
79    def __done__(self, result_code="", message=""):
80        pass
81
82
83class DisplayOutputReceiver:
84    def __init__(self):
85        self.output = ""
86        self.unfinished_line = ""
87
88    def __read__(self, output):
89        self.output = "%s%s" % (self.output, output)
90        lines = self._process_output(output)
91        for line in lines:
92            line = line.strip()
93            if line:
94                LOG.info(get_decode(line))
95
96    def __error__(self, message):
97        pass
98
99    def __done__(self, result_code="", message=""):
100        pass
101
102    def _process_output(self, output, end_mark="\n"):
103        content = output
104        if self.unfinished_line:
105            content = "".join((self.unfinished_line, content))
106            self.unfinished_line = ""
107        lines = content.split(end_mark)
108        if content.endswith(end_mark):
109            return lines[:-1]
110        else:
111            self.unfinished_line = lines[-1]
112            return lines[:-1]
113
114
115@dataclass
116class GTestConst(object):
117    exec_para_filter = "--gtest_filter"
118    exec_para_level = "--gtest_testsize"
119    exec_acts_para_filter = "--jstest_filter"
120    exec_acts_para_level = "--jstest_testsize"
121
122
123def get_device_log_file(report_path, serial=None, log_name="device_log"):
124    from xdevice import Variables
125    log_path = os.path.join(report_path, Variables.report_vars.log_dir)
126    os.makedirs(log_path, exist_ok=True)
127
128    serial = serial or time.time_ns()
129    device_file_name = "{}_{}.log".format(log_name, serial)
130    device_log_file = os.path.join(log_path, device_file_name)
131    return device_log_file
132
133
134def get_level_para_string(level_string):
135    level_list = list(set(level_string.split(",")))
136    level_para_string = ""
137    for item in level_list:
138        if not item.isdigit():
139            continue
140        item = item.strip(" ")
141        level_para_string = f"{level_para_string}Level{item},"
142    level_para_string = level_para_string.strip(",")
143    return level_para_string
144
145
146def get_result_savepath(testsuit_path, result_rootpath):
147    findkey = os.sep + "tests" + os.sep
148    filedir, _ = os.path.split(testsuit_path)
149    pos = filedir.find(findkey)
150    if -1 != pos:
151        subpath = filedir[pos + len(findkey):]
152        pos1 = subpath.find(os.sep)
153        if -1 != pos1:
154            subpath = subpath[pos1 + len(os.sep):]
155            result_path = os.path.join(result_rootpath, "result", subpath)
156        else:
157            result_path = os.path.join(result_rootpath, "result")
158    else:
159        result_path = os.path.join(result_rootpath, "result")
160
161    if not os.path.exists(result_path):
162        os.makedirs(result_path)
163
164    LOG.info("result_savepath = " + result_path)
165    return result_path
166
167
168def get_test_log_savepath(result_rootpath, result_suit_path):
169    suit_path = result_suit_path.split("result")[-1].strip(os.sep).split(os.sep)[0]
170    test_log_path = os.path.join(result_rootpath, "log", "test_log", suit_path)
171
172    if not os.path.exists(test_log_path):
173        os.makedirs(test_log_path)
174
175    LOG.info("test_log_savepath = {}".format(test_log_path))
176    return test_log_path
177
178
179def update_xml(suite_file, result_xml):
180    suite_path_txt = suite_file.split(".")[0] + "_path.txt"
181    if os.path.exists(suite_path_txt) and os.path.exists(result_xml):
182        with open(suite_path_txt, "r") as path_text:
183            line = path_text.readline().replace("//", "").strip()
184        tree = ET.parse(result_xml)
185        tree.getroot().attrib["path"] = line
186        tree.getroot().attrib["name"] = os.path.basename(suite_file.split(".")[0])
187
188        test_suit = os.path.basename(result_xml).split(".")[0]
189        log_path = os.path.abspath(result_xml).split("result")[0]
190        crash_path = os.path.join(log_path, "log", test_suit)
191        all_items = os.listdir(crash_path)
192
193        # 筛选以crash开头的目录
194        matching_dirs = [os.path.join(crash_path, item) for item in all_items if
195                         os.path.isdir(os.path.join(crash_path, item))
196                         and item.startswith(f"crash_log_{test_suit}")]
197        if len(matching_dirs) >= 1:
198            tree.getroot().attrib["is_crash"] = "1"
199        tree.write(result_xml)
200
201
202def remove_color_codes(text):
203    """
204    color ascii to utf-8
205    Args:
206         text:
207
208    Returns:
209
210    """
211    return text.encode('unicode_escape').decode('utf-8')
212
213
214# all testsuit common Unavailable test result xml
215def _create_empty_result_file(filepath, filename, error_message):
216    error_message = remove_color_codes(error_message)
217    error_message = error_message.replace("\"", "")
218    error_message = error_message.replace("<", "")
219    error_message = error_message.replace(">", "")
220    error_message = error_message.replace("&", "")
221    if filename.endswith(".hap"):
222        filename = filename.split(".")[0]
223    if not os.path.exists(filepath):
224        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
225            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
226                                       time.localtime())
227            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
228            file_desc.write(
229                '<testsuites tests="0" failures="0" '
230                'disabled="0" errors="0" timestamp="%s" '
231                'time="0" name="%s" unavailable="1">\n' % (time_stamp, filename))
232            file_desc.write(
233                '  <testsuite name="%s" tests="0" failures="0" '
234                'disabled="0" errors="0" time="0.0" '
235                'unavailable="1" message="%s">\n' %
236                (filename, error_message))
237            file_desc.write('  </testsuite>\n')
238            file_desc.write('</testsuites>\n')
239    return
240
241
242def _unlock_screen(device):
243    device.execute_shell_command("svc power stayon true")
244    time.sleep(1)
245
246
247def _unlock_device(device):
248    device.execute_shell_command("input keyevent 82")
249    time.sleep(1)
250    device.execute_shell_command("wm dismiss-keyguard")
251    time.sleep(1)
252
253
254def _lock_screen(device):
255    device.execute_shell_command("svc power stayon false")
256    time.sleep(1)
257
258
259def disable_keyguard(device):
260    _unlock_screen(device)
261    _unlock_device(device)
262
263
264def _sleep_according_to_result(result):
265    if result:
266        time.sleep(1)
267
268
269def _create_fuzz_crash_file(filepath, filename):
270    if not os.path.exists(filepath):
271        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
272            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
273                                       time.localtime())
274            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
275            file_desc.write(
276                '<testsuites disabled="0" name="%s" '
277                'time="300" timestamp="%s" errors="0" '
278                'failures="1" tests="1">\n' % (filename, time_stamp))
279            file_desc.write(
280                '  <testsuite disabled="0" name="%s" time="300" '
281                'errors="0" failures="1" tests="1">\n' % filename)
282            file_desc.write(
283                '    <testcase name="%s" time="300" classname="%s" '
284                'status="run">\n' % (filename, filename))
285            file_desc.write(
286                '      <failure type="" '
287                'message="Fuzzer crash. See ERROR in log file">\n')
288            file_desc.write('      </failure>\n')
289            file_desc.write('    </testcase>\n')
290            file_desc.write('  </testsuite>\n')
291            file_desc.write('</testsuites>\n')
292    return
293
294
295def _create_fuzz_pass_file(filepath, filename):
296    if not os.path.exists(filepath):
297        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
298            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
299                                       time.localtime())
300            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
301            file_desc.write(
302                '<testsuites disabled="0" name="%s" '
303                'time="300" timestamp="%s" errors="0" '
304                'failures="0" tests="1">\n' % (filename, time_stamp))
305            file_desc.write(
306                '  <testsuite disabled="0" name="%s" time="300" '
307                'errors="0" failures="0" tests="1">\n' % filename)
308            file_desc.write(
309                '    <testcase name="%s" time="300" classname="%s" '
310                'status="run"/>\n' % (filename, filename))
311            file_desc.write('  </testsuite>\n')
312            file_desc.write('</testsuites>\n')
313    return
314
315
316def _create_fuzz_result_file(filepath, filename, error_message):
317    error_message = str(error_message)
318    error_message = error_message.replace("\"", "")
319    error_message = error_message.replace("<", "")
320    error_message = error_message.replace(">", "")
321    error_message = error_message.replace("&", "")
322    if "AddressSanitizer" in error_message:
323        LOG.error("FUZZ TEST CRASH")
324        _create_fuzz_crash_file(filepath, filename)
325    elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second',
326                   error_message, re.M) is not None:
327        LOG.info("FUZZ TEST PASS")
328        _create_fuzz_pass_file(filepath, filename)
329    else:
330        LOG.error("FUZZ TEST UNAVAILABLE")
331        _create_empty_result_file(filepath, filename, error_message)
332    return
333
334
335class ResultManager(object):
336    def __init__(self, testsuit_path, config):
337        self.testsuite_path = testsuit_path
338        self.config = config
339        self.result_rootpath = self.config.report_path
340        self.device = self.config.device
341        if testsuit_path.endswith(".hap"):
342            self.device_testpath = self.config.test_hap_out_path
343        else:
344            self.device_testpath = self.config.target_test_path
345        self.testsuite_name = os.path.basename(self.testsuite_path)
346        self.is_coverage = False
347
348    def set_is_coverage(self, is_coverage):
349        self.is_coverage = is_coverage
350
351    def get_test_results(self, error_message=""):
352        # Get test result files
353        filepath, _ = self.obtain_test_result_file()
354        if "fuzztest" == self.config.testtype[0]:
355            LOG.info("create fuzz test report")
356            _create_fuzz_result_file(filepath, self.testsuite_name,
357                                     error_message)
358            if not self.is_coverage:
359                self._obtain_fuzz_corpus()
360
361        if not os.path.exists(filepath):
362            _create_empty_result_file(filepath, self.testsuite_name,
363                                      error_message)
364        if "benchmark" == self.config.testtype[0]:
365            self._obtain_benchmark_result()
366        # Get coverage data files
367        if self.is_coverage:
368            self.obtain_coverage_data()
369
370        return filepath
371
372    def get_test_results_hidelog(self, error_message=""):
373        # Get test result files
374        result_file_path, test_log_path = self.obtain_test_result_file()
375        log_content = ""
376        if not error_message:
377            if os.path.exists(test_log_path):
378                with open(test_log_path, "r") as log:
379                    log_content = log.readlines()
380            else:
381                LOG.error("{}: Test log not exist.".format(test_log_path))
382        else:
383            log_content = error_message
384
385        if "fuzztest" == self.config.testtype[0]:
386            LOG.info("create fuzz test report")
387            _create_fuzz_result_file(result_file_path, self.testsuite_name,
388                                     log_content)
389            if not self.is_coverage:
390                self._obtain_fuzz_corpus()
391
392        if not os.path.exists(result_file_path):
393            _create_empty_result_file(result_file_path, self.testsuite_name,
394                                      log_content)
395        if "benchmark" == self.config.testtype[0]:
396            self._obtain_benchmark_result()
397        # Get coverage data files
398        if self.is_coverage:
399            self.obtain_coverage_data()
400
401        return result_file_path
402
403    def get_result_sub_save_path(self):
404        find_key = os.sep + "benchmark" + os.sep
405        file_dir, _ = os.path.split(self.testsuite_path)
406        pos = file_dir.find(find_key)
407        subpath = ""
408        if -1 != pos:
409            subpath = file_dir[pos + len(find_key):]
410        LOG.info("subpath = " + subpath)
411        return subpath
412
413    def obtain_test_result_file(self):
414        result_save_path = get_result_savepath(self.testsuite_path,
415                                               self.result_rootpath)
416
417        result_file_path = os.path.join(result_save_path,
418                                        "%s.xml" % self.testsuite_name)
419
420        result_josn_file_path = os.path.join(result_save_path,
421                                             "%s.json" % self.testsuite_name)
422
423        if self.testsuite_path.endswith('.hap'):
424            remote_result_file = os.path.join(self.device_testpath,
425                                              "testcase_result.xml")
426            remote_json_result_file = os.path.join(self.device_testpath,
427                                                   "%s.json" % self.testsuite_name)
428        else:
429            remote_result_file = os.path.join(self.device_testpath,
430                                              "%s.xml" % self.testsuite_name)
431            remote_json_result_file = os.path.join(self.device_testpath,
432                                                   "%s.json" % self.testsuite_name)
433
434        if self.config.testtype[0] != "fuzztest":
435            if self.device.is_file_exist(remote_result_file):
436                self.device.pull_file(remote_result_file, result_file_path)
437            elif self.device.is_file_exist(remote_json_result_file):
438                self.device.pull_file(remote_json_result_file,
439                                      result_josn_file_path)
440                result_file_path = result_josn_file_path
441            else:
442                LOG.info("%s not exist", remote_result_file)
443
444        if self.config.hidelog:
445            remote_log_result_file = os.path.join(self.device_testpath,
446                                                  "%s.log" % self.testsuite_name)
447            test_log_save_path = get_test_log_savepath(self.result_rootpath, result_save_path)
448            test_log_file_path = os.path.join(test_log_save_path,
449                                              "%s.log" % self.testsuite_name)
450            self.device.pull_file(remote_log_result_file, test_log_file_path)
451            return result_file_path, test_log_file_path
452
453        return result_file_path, ""
454
455    def make_empty_result_file(self, error_message=""):
456        result_savepath = get_result_savepath(self.testsuite_path,
457                                              self.result_rootpath)
458        result_filepath = os.path.join(result_savepath, "%s.xml" %
459                                       self.testsuite_name)
460        if not os.path.exists(result_filepath):
461            _create_empty_result_file(result_filepath,
462                                      self.testsuite_name, error_message)
463
464    def is_exist_target_in_device(self, path, target):
465        if platform.system() == "Windows":
466            command = '\"ls -l %s | grep %s\"' % (path, target)
467        else:
468            command = "ls -l %s | grep %s" % (path, target)
469
470        check_result = False
471        stdout_info = self.device.execute_shell_command(command)
472        if stdout_info != "" and stdout_info.find(target) != -1:
473            check_result = True
474        return check_result
475
476    def obtain_coverage_data(self):
477        cov_root_dir = os.path.abspath(os.path.join(
478            self.result_rootpath,
479            "..",
480            "coverage",
481            "data",
482            "exec"))
483
484
485        tests_path = self.config.testcases_path
486        test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0]
487        cxx_cov_path = os.path.abspath(os.path.join(
488            self.result_rootpath,
489            "..",
490            "coverage",
491            "data",
492            "cxx",
493            self.testsuite_name + '_' + test_type))
494
495        if os.path.basename(self.testsuite_name).startswith("rust_"):
496            target_name = "lib.unstripped"
497        else:
498            target_name = OBJ
499        if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name):
500            if not os.path.exists(cxx_cov_path):
501                os.makedirs(cxx_cov_path)
502            else:
503                cxx_cov_path = cxx_cov_path + f"_{str(int(time.time()))}"
504            self.config.device.execute_shell_command(
505                "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name))
506            src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name)
507            self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT)
508            tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name)
509            if platform.system() == "Windows":
510                process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True)
511                process.communicate()
512                os.remove(tar_path)
513                os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ))
514            else:
515                subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" %
516                                 (tar_path, cxx_cov_path), shell=True).communicate()
517                subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate()
518                if target_name != OBJ:
519                    subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name),
520                                                   os.path.join(cxx_cov_path, OBJ)), shell=True).communicate()
521
522    def _obtain_fuzz_corpus(self):
523        command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;"
524        self.config.device.execute_shell_command(command)
525        result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath)
526        LOG.info(f"fuzz_dir = {result_save_path}")
527        self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path)
528
529    def _obtain_benchmark_result(self):
530        benchmark_root_dir = os.path.abspath(
531            os.path.join(self.result_rootpath, "benchmark"))
532        benchmark_dir = os.path.abspath(
533            os.path.join(benchmark_root_dir,
534                         self.get_result_sub_save_path(),
535                         self.testsuite_name))
536
537        if not os.path.exists(benchmark_dir):
538            os.makedirs(benchmark_dir)
539
540        LOG.info("benchmark_dir = %s" % benchmark_dir)
541        self.device.pull_file(os.path.join(self.device_testpath,
542                                           "%s.json" % self.testsuite_name), benchmark_dir)
543        if not os.path.exists(os.path.join(benchmark_dir,
544                                           "%s.json" % self.testsuite_name)):
545            os.rmdir(benchmark_dir)
546        return benchmark_dir
547
548
549@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test)
550class CppTestDriver(IDriver):
551    """
552    CppTest is a Test that runs a native test package on given device.
553    """
554    # test driver config
555    config = None
556    result = ""
557
558    def __check_environment__(self, device_options):
559        if len(device_options) == 1 and device_options[0].label is None:
560            return True
561        if len(device_options) != 1 or \
562                device_options[0].label != DeviceLabelType.phone:
563            return False
564        return True
565
566    def __check_config__(self, config):
567        pass
568
569    def __result__(self):
570        return self.result if os.path.exists(self.result) else ""
571
572    def __execute__(self, request):
573        try:
574            self.config = request.config
575            self.config.target_test_path = DEFAULT_TEST_PATH
576            self.config.device = request.config.environment.devices[0]
577
578            suite_file = request.root.source.source_file
579            LOG.debug("Testsuite FilePath: %s" % suite_file)
580
581            if not suite_file:
582                LOG.error("test source '%s' not exists" %
583                          request.root.source.source_string)
584                return
585
586            if not self.config.device:
587                result = ResultManager(suite_file, self.config)
588                result.set_is_coverage(False)
589                result.make_empty_result_file(
590                    "No test device is found. ")
591                return
592
593            self.config.device.set_device_report_path(request.config.report_path)
594            if request.config.hilogswitch != "0":
595                self.config.device.device_log_collector.start_hilog_task()
596            self._init_gtest()
597            self._run_gtest(suite_file)
598
599        finally:
600            log_path = get_result_savepath(request.root.source.source_file, request.config.report_path)
601            suit_name = os.path.basename(request.root.source.source_file)
602            xml_path = os.path.join(log_path, f"{suit_name}.xml")
603            if not os.path.exists(xml_path):
604                _create_empty_result_file(xml_path, suit_name, "ERROR")
605            serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns())
606            log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace(
607                ":", "_"))
608            try:
609                if request.config.hilogswitch != "0":
610                    self.config.device.device_log_collector.stop_hilog_task(
611                        log_tar_file_name, module_name=request.get_module_name())
612            finally:
613                update_xml(request.root.source.source_file, xml_path)
614
615    @staticmethod
616    def _alter_init(name):
617        with open(name, "rb") as f:
618            lines = f.read()
619        str_content = lines.decode("utf-8")
620
621        pattern_sharp = '^\s*#.*$(\n|\r\n)'
622        pattern_star = '/\*.*?\*/(\n|\r\n)+'
623        pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+'
624
625        if re.match(pattern_sharp, str_content, flags=re.M):
626            striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M)
627        elif re.findall(pattern_star, str_content, flags=re.S):
628            striped_content = re.sub(pattern_star, '', str_content, flags=re.S)
629        elif re.findall(pattern_xml, str_content, flags=re.S):
630            striped_content = re.sub(pattern_xml, '', str_content, flags=re.S)
631        else:
632            striped_content = str_content
633
634        striped_bt = striped_content.encode("utf-8")
635        if os.path.exists(name):
636            os.remove(name)
637        with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f:
638            f.write(striped_bt)
639
640    def _init_gtest(self):
641        self.config.device.connector_command("target mount")
642        self.config.device.execute_shell_command(
643            "rm -rf %s" % self.config.target_test_path)
644        self.config.device.execute_shell_command(
645            "mkdir -p %s" % self.config.target_test_path)
646        self.config.device.execute_shell_command(
647            "mount -o rw,remount,rw /")
648        if "fuzztest" == self.config.testtype[0]:
649            self.config.device.execute_shell_command(
650                "mkdir -p %s" % os.path.join(self.config.target_test_path,
651                                             "corpus"))
652
653    def _gtest_command(self, suite_file):
654        filename = os.path.basename(suite_file)
655        test_para = self._get_test_para(self.config.testcase,
656                                        self.config.testlevel,
657                                        self.config.testtype,
658                                        self.config.target_test_path,
659                                        suite_file,
660                                        filename,
661                                        self.config.iteration)
662
663        # execute testcase
664        if not self.config.coverage:
665            if self.config.random == "random":
666                seed = random.randint(1, 100)
667                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % (
668                    self.config.target_test_path,
669                    filename,
670                    filename,
671                    test_para,
672                    seed)
673            else:
674                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % (
675                    self.config.target_test_path,
676                    filename,
677                    filename,
678                    test_para)
679        else:
680            coverage_outpath = self.config.coverage_outpath
681            if coverage_outpath:
682                strip_num = len(coverage_outpath.strip("/").split("/"))
683            else:
684                ohos_config_path = os.path.join(sys.source_code_root_path, "out", "ohos_config.json")
685                with open(ohos_config_path, 'r') as json_file:
686                    json_info = json.load(json_file)
687                    out_path = json_info.get("out_path")
688                strip_num = len(out_path.strip("/").split("/"))
689            if "fuzztest" == self.config.testtype[0]:
690                self._push_corpus_cov_if_exist(suite_file)
691                command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \
692                                        rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX={DEFAULT_TEST_PATH}; \
693                                        GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}"
694            else:
695                command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=%s " \
696                          "GCOV_PREFIX_STRIP=%s ./%s %s" % \
697                          (self.config.target_test_path,
698                           filename,
699                           DEFAULT_TEST_PATH,
700                           str(strip_num),
701                           filename,
702                           test_para)
703
704        if self.config.hidelog:
705            command += " > {}.log 2>&1".format(filename)
706
707        return command
708
709    def _run_gtest(self, suite_file):
710        from xdevice import Variables
711        is_coverage_test = True if self.config.coverage else False
712
713        # push testsuite file
714        self.config.device.push_file(suite_file, self.config.target_test_path)
715        self.config.device.execute_shell_command(
716            "hilog -d %s" % (os.path.join(self.config.target_test_path,
717                                          os.path.basename(suite_file)))
718        )
719        self._push_corpus_if_exist(suite_file)
720
721        # push resource files
722        resource_manager = ResourceManager()
723        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
724        resource_manager.process_preparer_data(resource_data_dic, resource_dir,
725                                               self.config.device)
726
727        command = self._gtest_command(suite_file)
728
729        result = ResultManager(suite_file, self.config)
730        result.set_is_coverage(is_coverage_test)
731
732        try:
733            # get result
734            if self.config.hidelog:
735                return_message = ""
736                display_receiver = CollectingOutputReceiver()
737                self.config.device.execute_shell_command(
738                    command,
739                    receiver=display_receiver,
740                    timeout=TIME_OUT,
741                    retry=0)
742            else:
743                display_receiver = DisplayOutputReceiver()
744                self.config.device.execute_shell_command(
745                    command,
746                    receiver=display_receiver,
747                    timeout=TIME_OUT,
748                    retry=0)
749                return_message = display_receiver.output
750        except (ExecuteTerminate, DeviceError) as exception:
751            return_message = str(exception.args)
752
753        if self.config.hidelog:
754            self.result = result.get_test_results_hidelog(return_message)
755        else:
756            self.result = result.get_test_results(return_message)
757
758        resource_manager.process_cleaner_data(resource_data_dic,
759                                              resource_dir,
760                                              self.config.device)
761
762    def _push_corpus_cov_if_exist(self, suite_file):
763        corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep)
764        cov_file = os.path.join(
765            sys.framework_root_dir, "reports", "latest_corpus", corpus_path + "_corpus.tar.gz")
766        LOG.info("corpus_cov file :%s" % str(cov_file))
767        self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path))
768
769    def _push_corpus_if_exist(self, suite_file):
770        if "fuzztest" == self.config.testtype[0]:
771            corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus")
772            if not os.path.isdir(corpus_path):
773                return
774
775            corpus_dirs = []
776            corpus_file_list = []
777
778            for root, _, files in os.walk(corpus_path):
779                if not files:
780                    continue
781
782                corpus_dir = root.split("corpus")[-1]
783                if corpus_dir != "":
784                    corpus_dirs.append(corpus_dir)
785
786                for file in files:
787                    cp_file = os.path.normcase(os.path.join(root, file))
788                    corpus_file_list.append(cp_file)
789                    if file == "init":
790                        self._alter_init(cp_file)
791
792            # mkdir corpus files dir
793            if corpus_dirs:
794                for corpus in corpus_dirs:
795                    mkdir_corpus_command = f"shell; mkdir -p {corpus}"
796                    self.config.device.connector_command(mkdir_corpus_command)
797
798            # push corpus file
799            if corpus_file_list:
800                for corpus_file in corpus_file_list:
801                    self.config.device.push_file(corpus_file,
802                                                 os.path.join(self.config.target_test_path, "corpus"))
803
804    def _get_test_para(self,
805                       testcase,
806                       testlevel,
807                       testtype,
808                       target_test_path,
809                       suite_file,
810                       filename,
811                       iteration):
812        if "benchmark" == testtype[0]:
813            test_para = (" --benchmark_out_format=json"
814                         " --benchmark_out=%s%s.json") % (
815                            target_test_path, filename)
816            return test_para
817
818        if "" != testcase and "" == testlevel:
819            test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
820        elif "" == testcase and "" != testlevel:
821            level_para = get_level_para_string(testlevel)
822            test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
823        else:
824            test_para = ""
825
826        if iteration:
827            test_para += f" --gtest_repeat={iteration}"
828
829        if "fuzztest" == testtype[0]:
830            cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path(
831                suite_file), "project.xml")).get_fuzzer_config("fuzztest")
832            LOG.info("config list :%s" % str(cfg_list))
833            if self.config.coverage:
834                test_para += "corpus -runs=0" + \
835                             " -max_len=" + cfg_list[0] + \
836                             " -max_total_time=" + cfg_list[1] + \
837                             " -rss_limit_mb=" + cfg_list[2]
838            else:
839                test_para += "corpus -max_len=" + cfg_list[0] + \
840                             " -max_total_time=" + cfg_list[1] + \
841                             " -rss_limit_mb=" + cfg_list[2]
842
843        return test_para
844
845
846@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test)
847class JSUnitTestDriver(IDriver):
848    """
849    JSUnitTestDriver is a Test that runs a native test package on given device.
850    """
851
852    def __init__(self):
853        self.config = None
854        self.result = ""
855        self.start_time = None
856        self.ability_name = ""
857        self.package_name = ""
858        # log
859        self.hilog = None
860        self.hilog_proc = None
861
862    def __check_environment__(self, device_options):
863        pass
864
865    def __check_config__(self, config):
866        pass
867
868    def __result__(self):
869        return self.result if os.path.exists(self.result) else ""
870
871    def __execute__(self, request):
872        try:
873            LOG.info("developer_test driver")
874            self.config = request.config
875            self.config.target_test_path = DEFAULT_TEST_PATH
876            self.config.device = request.config.environment.devices[0]
877
878            suite_file = request.root.source.source_file
879            result_save_path = get_result_savepath(suite_file, self.config.report_path)
880            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
881            if not suite_file:
882                LOG.error("test source '%s' not exists" %
883                          request.root.source.source_string)
884                return
885
886            if not self.config.device:
887                result = ResultManager(suite_file, self.config)
888                result.set_is_coverage(False)
889                result.make_empty_result_file(
890                    "No test device is found")
891                return
892
893            package_name, ability_name = self._get_package_and_ability_name(
894                suite_file)
895            self.package_name = package_name
896            self.ability_name = ability_name
897            self.config.test_hap_out_path = \
898                "/data/data/%s/files/" % self.package_name
899            self.config.device.connector_command("shell hilog -r")
900
901            self.hilog = get_device_log_file(
902                request.config.report_path,
903                request.config.device.__get_serial__() + "_" + request.
904                get_module_name(),
905                "device_hilog")
906
907            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
908                                 0o755)
909
910            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
911                self.config.device.device_log_collector.add_log_address(None, self.hilog)
912                _, self.hilog_proc = self.config.device.device_log_collector.\
913                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
914                self._init_jsunit_test()
915                self._run_jsunit(suite_file, self.hilog)
916                hilog_file_pipe.flush()
917                self.generate_console_output(self.hilog, request)
918                xml_path = os.path.join(
919                    request.config.report_path, "result",
920                    '.'.join((request.get_module_name(), "xml")))
921                shutil.move(xml_path, self.result)
922        finally:
923            self.config.device.device_log_collector.remove_log_address(None, self.hilog)
924            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
925            update_xml(request.root.source.source_file, self.result)
926
927    @staticmethod
928    def _get_acts_test_para(testcase,
929                            testlevel,
930                            testtype,
931                            target_test_path,
932                            suite_file,
933                            filename):
934        if "actstest" == testtype[0]:
935            test_para = (" --actstest_out_format=json"
936                         " --actstest_out=%s%s.json") % (
937                            target_test_path, filename)
938            return test_para
939
940        if "" != testcase and "" == testlevel:
941            test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase)
942        elif "" == testcase and "" != testlevel:
943            level_para = get_level_para_string(testlevel)
944            test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para)
945        else:
946            test_para = ""
947        return test_para
948
949    @staticmethod
950    def _get_hats_test_para(testcase,
951                            testlevel,
952                            testtype,
953                            target_test_path,
954                            suite_file,
955                            filename):
956        if "hatstest" == testtype[0]:
957            test_hats_para = (" --hatstest_out_format=json"
958                         " --hatstest_out=%s%s.json") % (
959                            target_test_path, filename)
960            return test_hats_para
961
962        if "" != testcase and "" == testlevel:
963            test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
964        elif "" == testcase and "" != testlevel:
965            level_para = get_level_para_string(testlevel)
966            test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
967        else:
968            test_hats_para = ""
969        return test_hats_para
970
971    @classmethod
972    def _get_json_shell_timeout(cls, json_filepath):
973        test_timeout = 0
974        try:
975            with open(json_filepath, 'r') as json_file:
976                data_dic = json.load(json_file)
977                if not data_dic:
978                    return test_timeout
979                else:
980                    if "driver" in data_dic.keys():
981                        driver_dict = data_dic.get("driver")
982                        if driver_dict and "test-timeout" in driver_dict.keys():
983                            test_timeout = int(driver_dict["shell-timeout"]) / 1000
984                    return test_timeout
985        except JSONDecodeError:
986            return test_timeout
987        finally:
988            print(" get json shell timeout finally")
989
990    @staticmethod
991    def _get_package_and_ability_name(hap_filepath):
992        package_name = ""
993        ability_name = ""
994        if os.path.exists(hap_filepath):
995            filename = os.path.basename(hap_filepath)
996
997            # unzip the hap file
998            hap_bak_path = os.path.abspath(os.path.join(
999                os.path.dirname(hap_filepath),
1000                "%s.bak" % filename))
1001            zf_desc = zipfile.ZipFile(hap_filepath)
1002            try:
1003                zf_desc.extractall(path=hap_bak_path)
1004            except RuntimeError as error:
1005                print("Unzip error: ", hap_bak_path)
1006            zf_desc.close()
1007
1008            # verify config.json file
1009            app_profile_path = os.path.join(hap_bak_path, "config.json")
1010            if not os.path.exists(app_profile_path):
1011                print("file %s not exist" % app_profile_path)
1012                return package_name, ability_name
1013
1014            if os.path.isdir(app_profile_path):
1015                print("%s is a folder, and not a file" % app_profile_path)
1016                return package_name, ability_name
1017
1018            # get package_name and ability_name value
1019            load_dict = {}
1020            with open(app_profile_path, 'r') as load_f:
1021                load_dict = json.load(load_f)
1022            profile_list = load_dict.values()
1023            for profile in profile_list:
1024                package_name = profile.get("package")
1025                if not package_name:
1026                    continue
1027                abilities = profile.get("abilities")
1028                for abilitie in abilities:
1029                    abilities_name = abilitie.get("name")
1030                    if abilities_name.startswith("."):
1031                        ability_name = package_name + abilities_name[
1032                                                      abilities_name.find("."):]
1033                    else:
1034                        ability_name = abilities_name
1035                    break
1036                break
1037
1038            # delete hap_bak_path
1039            if os.path.exists(hap_bak_path):
1040                shutil.rmtree(hap_bak_path)
1041        else:
1042            print("file %s not exist" % hap_filepath)
1043        return package_name, ability_name
1044
1045    def generate_console_output(self, device_log_file, request):
1046        result_message = self.read_device_log(device_log_file)
1047
1048        report_name = request.get_module_name()
1049        parsers = get_plugin(
1050            Plugin.PARSER, CommonParserType.jsunit)
1051        if parsers:
1052            parsers = parsers[:1]
1053        for listener in request.listeners:
1054            listener.device_sn = self.config.device.device_sn
1055        parser_instances = []
1056
1057        for parser in parsers:
1058            parser_instance = parser.__class__()
1059            parser_instance.suites_name = report_name
1060            parser_instance.suite_name = report_name
1061            parser_instance.listeners = request.listeners
1062            parser_instances.append(parser_instance)
1063        handler = ShellHandler(parser_instances)
1064        process_command_ret(result_message, handler)
1065
1066    def read_device_log(self, device_log_file):
1067        result_message = ""
1068        with open(device_log_file, "r", encoding='utf-8',
1069                  errors='ignore') as file_read_pipe:
1070            while True:
1071                data = file_read_pipe.readline()
1072                if not data:
1073                    break
1074                # only filter JSApp log
1075                if data.lower().find(_ACE_LOG_MARKER) != -1:
1076                    result_message += data
1077                    if data.find("[end] run suites end") != -1:
1078                        break
1079        return result_message
1080
1081    def start_hap_execute(self):
1082        try:
1083            command = "aa start -d 123 -a %s.MainAbility -b %s" \
1084                      % (self.package_name, self.package_name)
1085            self.start_time = time.time()
1086            result_value = self.config.device.execute_shell_command(
1087                command, timeout=TIME_OUT)
1088
1089            if "success" in str(result_value).lower():
1090                LOG.info("execute %s's testcase success. result value=%s"
1091                         % (self.package_name, result_value))
1092            else:
1093                LOG.info("execute %s's testcase failed. result value=%s"
1094                         % (self.package_name, result_value))
1095
1096            _sleep_according_to_result(result_value)
1097            return_message = result_value
1098        except (ExecuteTerminate, DeviceError) as exception:
1099            return_message = exception.args
1100
1101        return return_message
1102
1103    def _init_jsunit_test(self):
1104        self.config.device.connector_command("target mount")
1105        self.config.device.execute_shell_command(
1106            "rm -rf %s" % self.config.target_test_path)
1107        self.config.device.execute_shell_command(
1108            "mkdir -p %s" % self.config.target_test_path)
1109        self.config.device.execute_shell_command(
1110            "mount -o rw,remount,rw /")
1111
1112    def _run_jsunit(self, suite_file, device_log_file):
1113        filename = os.path.basename(suite_file)
1114        _, suffix_name = os.path.splitext(filename)
1115
1116        resource_manager = ResourceManager()
1117        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
1118        if suffix_name == ".hap":
1119            json_file_path = suite_file.replace(".hap", ".json")
1120            if os.path.exists(json_file_path):
1121                timeout = self._get_json_shell_timeout(json_file_path)
1122            else:
1123                timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
1124        else:
1125            timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
1126        resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device)
1127        main_result = self._install_hap(suite_file)
1128        result = ResultManager(suite_file, self.config)
1129        if main_result:
1130            self._execute_hapfile_jsunittest()
1131            try:
1132                status = False
1133                actiontime = JS_TIMEOUT
1134                times = CYCLE_TIMES
1135                if timeout:
1136                    actiontime = timeout
1137                    times = 1
1138                with open(device_log_file, "r", encoding='utf-8',
1139                          errors='ignore') as file_read_pipe:
1140                    for i in range(0, times):
1141                        if status:
1142                            break
1143                        else:
1144                            time.sleep(float(actiontime))
1145                        start_time = int(time.time())
1146                        while True:
1147                            data = file_read_pipe.readline()
1148                            if data.lower().find(_ACE_LOG_MARKER) != -1 and data.find("[end] run suites end") != -1:
1149                                LOG.info("execute testcase successfully.")
1150                                status = True
1151                                break
1152                            if int(time.time()) - start_time > 5:
1153                                break
1154            finally:
1155                _lock_screen(self.config.device)
1156                self._uninstall_hap(self.package_name)
1157        else:
1158            self.result = result.get_test_results("Error: install hap failed")
1159            LOG.error("Error: install hap failed")
1160
1161        resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device)
1162
1163    def _execute_hapfile_jsunittest(self):
1164        _unlock_screen(self.config.device)
1165        _unlock_device(self.config.device)
1166
1167        try:
1168            return_message = self.start_hap_execute()
1169        except (ExecuteTerminate, DeviceError) as exception:
1170            return_message = str(exception.args)
1171
1172        return return_message
1173
1174    def _install_hap(self, suite_file):
1175        message = self.config.device.connector_command("install %s" % suite_file)
1176        message = str(message).rstrip()
1177        if message == "" or "success" in message:
1178            return_code = True
1179            if message != "":
1180                LOG.info(message)
1181        else:
1182            return_code = False
1183            if message != "":
1184                LOG.warning(message)
1185
1186        _sleep_according_to_result(return_code)
1187        return return_code
1188
1189    def _uninstall_hap(self, package_name):
1190        return_message = self.config.device.execute_shell_command(
1191            "bm uninstall -n %s" % package_name)
1192        _sleep_according_to_result(return_message)
1193        return return_message
1194
1195
1196@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
1197class OHRustTestDriver(IDriver):
1198    def __init__(self):
1199        self.result = ""
1200        self.error_message = ""
1201        self.config = None
1202
1203    def __check_environment__(self, device_options):
1204        pass
1205
1206    def __check_config__(self, config):
1207        pass
1208
1209    def __execute__(self, request):
1210        try:
1211            LOG.debug("Start to execute open harmony rust test")
1212            self.config = request.config
1213            self.config.device = request.config.environment.devices[0]
1214            self.config.target_test_path = DEFAULT_TEST_PATH
1215            suite_file = request.root.source.source_file
1216            LOG.debug("Testsuite filepath:{}".format(suite_file))
1217
1218            if not suite_file:
1219                LOG.error("test source '{}' not exists".format(
1220                    request.root.source.source_string))
1221                return
1222
1223            result_save_path = get_result_savepath(suite_file, self.config.report_path)
1224            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
1225            self.config.device.set_device_report_path(request.config.report_path)
1226            self.config.device.device_log_collector.start_hilog_task()
1227            self._init_oh_rust()
1228            self._run_oh_rust(suite_file, request)
1229        except Exception as exception:
1230            self.error_message = exception
1231            if not getattr(exception, "error_no", ""):
1232                setattr(exception, "error_no", "03409")
1233            LOG.exception(self.error_message, exc_info=False, error_no="03409")
1234        finally:
1235            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
1236                                    time.time_ns())
1237            log_tar_file_name = "{}_{}".format(
1238                request.get_module_name(), str(serial).replace(":", "_"))
1239            try:
1240                self.config.device.device_log_collector.stop_hilog_task(
1241                    log_tar_file_name, module_name=request.get_module_name())
1242            finally:
1243                xml_path = os.path.join(
1244                    request.config.report_path, "result",
1245                    '.'.join((request.get_module_name(), "xml")))
1246                shutil.move(xml_path, self.result)
1247                update_xml(request.root.source.source_file, self.result)
1248                self.result = check_result_report(
1249                    request.config.report_path, self.result, self.error_message)
1250
1251    def __result__(self):
1252        return self.result if os.path.exists(self.result) else ""
1253
1254    def _init_oh_rust(self):
1255        self.config.device.connector_command("target mount")
1256        self.config.device.execute_shell_command(
1257            "rm -rf %s" % self.config.target_test_path)
1258        self.config.device.execute_shell_command(
1259            "mkdir -p %s" % self.config.target_test_path)
1260        self.config.device.execute_shell_command(
1261            "mount -o rw,remount,rw /")
1262
1263    def _run_oh_rust(self, suite_file, request=None):
1264        self.config.device.push_file(suite_file, self.config.target_test_path)
1265        self.config.device.execute_shell_command(
1266            "hilog -d %s" % (os.path.join(self.config.target_test_path,
1267                                          os.path.basename(suite_file)))
1268        )
1269        resource_manager = ResourceManager()
1270        resource_data_dict, resource_dir = \
1271            resource_manager.get_resource_data_dic(suite_file)
1272        resource_manager.process_preparer_data(resource_data_dict,
1273                                               resource_dir,
1274                                               self.config.device)
1275        for listener in request.listeners:
1276            listener.device_sn = self.config.device.device_sn
1277
1278        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust)
1279        if parsers:
1280            parsers = parsers[:1]
1281        parser_instances = []
1282        for parser in parsers:
1283            parser_instance = parser.__class__()
1284            parser_instance.suite_name = request.get_module_name()
1285            parser_instance.listeners = request.listeners
1286            parser_instances.append(parser_instance)
1287        handler = ShellHandler(parser_instances)
1288        if self.config.coverage:
1289            command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format(
1290                self.config.target_test_path, os.path.basename(suite_file))
1291        else:
1292            command = "cd {}; chmod +x *; ./{}".format(
1293                self.config.target_test_path, os.path.basename(suite_file))
1294        self.config.device.execute_shell_command(
1295            command, timeout=TIME_OUT, receiver=handler, retry=0)
1296        if self.config.coverage:
1297            result = ResultManager(suite_file, self.config)
1298            result.obtain_coverage_data()
1299        resource_manager.process_cleaner_data(resource_data_dict, resource_dir,
1300                                              self.config.device)
1301