• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21import re
22import shutil
23import subprocess
24import sys
25import time
26import platform
27import zipfile
28import stat
29import random
30import xml.etree.ElementTree as ET
31from dataclasses import dataclass
32from json import JSONDecodeError
33
34from xdevice import DeviceTestType, check_result_report
35from xdevice import DeviceLabelType
36from xdevice import CommonParserType
37from xdevice import ExecuteTerminate
38from xdevice import DeviceError
39from xdevice import ShellHandler
40
41from xdevice import IDriver
42from xdevice import platform_logger
43from xdevice import Plugin
44from xdevice import get_plugin
45from ohos.environment.dmlib import process_command_ret
46from core.utils import get_decode
47from core.utils import get_fuzzer_path
48from core.config.resource_manager import ResourceManager
49from core.config.config_manager import FuzzerConfigManager
50
51__all__ = [
52    "CppTestDriver",
53    "JSUnitTestDriver",
54    "disable_keyguard",
55    "GTestConst"]
56
57LOG = platform_logger("Drivers")
58DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
59OBJ = "obj"
60_ACE_LOG_MARKER = " a0c0d0"
61TIME_OUT = 900 * 1000
62JS_TIMEOUT = 10
63CYCLE_TIMES = 50
64
65FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL
66MODES = stat.S_IWUSR | stat.S_IRUSR
67
68
69class CollectingOutputReceiver:
70    def __init__(self):
71        self.output = ""
72
73    def __read__(self, output):
74        self.output = "%s%s" % (self.output, output)
75
76    def __error__(self, message):
77        pass
78
79    def __done__(self, result_code="", message=""):
80        pass
81
82
83class DisplayOutputReceiver:
84    def __init__(self):
85        self.output = ""
86        self.unfinished_line = ""
87
88    def __read__(self, output):
89        self.output = "%s%s" % (self.output, output)
90        lines = self._process_output(output)
91        for line in lines:
92            line = line.strip()
93            if line:
94                LOG.info(get_decode(line))
95
96    def __error__(self, message):
97        pass
98
99    def __done__(self, result_code="", message=""):
100        pass
101
102    def _process_output(self, output, end_mark="\n"):
103        content = output
104        if self.unfinished_line:
105            content = "".join((self.unfinished_line, content))
106            self.unfinished_line = ""
107        lines = content.split(end_mark)
108        if content.endswith(end_mark):
109            return lines[:-1]
110        else:
111            self.unfinished_line = lines[-1]
112            return lines[:-1]
113
114
115@dataclass
116class GTestConst(object):
117    exec_para_filter = "--gtest_filter"
118    exec_para_level = "--gtest_testsize"
119    exec_acts_para_filter = "--jstest_filter"
120    exec_acts_para_level = "--jstest_testsize"
121
122
123def get_device_log_file(report_path, serial=None, log_name="device_log"):
124    from xdevice import Variables
125    log_path = os.path.join(report_path, Variables.report_vars.log_dir)
126    os.makedirs(log_path, exist_ok=True)
127
128    serial = serial or time.time_ns()
129    device_file_name = "{}_{}.log".format(log_name, serial)
130    device_log_file = os.path.join(log_path, device_file_name)
131    return device_log_file
132
133
134def get_level_para_string(level_string):
135    level_list = list(set(level_string.split(",")))
136    level_para_string = ""
137    for item in level_list:
138        if not item.isdigit():
139            continue
140        item = item.strip(" ")
141        level_para_string = f"{level_para_string}Level{item},"
142    level_para_string = level_para_string.strip(",")
143    return level_para_string
144
145
146def get_result_savepath(testsuit_path, result_rootpath):
147    findkey = os.sep + "tests" + os.sep
148    filedir, _ = os.path.split(testsuit_path)
149    pos = filedir.find(findkey)
150    if -1 != pos:
151        subpath = filedir[pos + len(findkey):]
152        pos1 = subpath.find(os.sep)
153        if -1 != pos1:
154            subpath = subpath[pos1 + len(os.sep):]
155            result_path = os.path.join(result_rootpath, "result", subpath)
156        else:
157            result_path = os.path.join(result_rootpath, "result")
158    else:
159        result_path = os.path.join(result_rootpath, "result")
160
161    if not os.path.exists(result_path):
162        os.makedirs(result_path)
163
164    LOG.info("result_savepath = " + result_path)
165    return result_path
166
167
168def get_test_log_savepath(result_rootpath, result_suit_path):
169    suit_path = result_suit_path.split("result")[-1].strip(os.sep).split(os.sep)[0]
170    test_log_path = os.path.join(result_rootpath, "log", "test_log", suit_path)
171
172    if not os.path.exists(test_log_path):
173        os.makedirs(test_log_path)
174
175    LOG.info("test_log_savepath = {}".format(test_log_path))
176    return test_log_path
177
178
179def update_xml(suite_file, result_xml):
180    suite_path_txt = suite_file.split(".")[0] + "_path.txt"
181    if os.path.exists(suite_path_txt) and os.path.exists(result_xml):
182        with open(suite_path_txt, "r") as path_text:
183            line = path_text.readline().replace("//", "").strip()
184        tree = ET.parse(result_xml)
185        tree.getroot().attrib["path"] = line
186        tree.getroot().attrib["name"] = os.path.basename(suite_file.split(".")[0])
187
188        test_suit = os.path.basename(result_xml).split(".")[0]
189        log_path = os.path.abspath(result_xml).split("result")[0]
190        crash_path = os.path.join(log_path, "log", test_suit)
191        all_items = os.listdir(crash_path)
192
193        # 筛选以crash开头的目录
194        matching_dirs = [os.path.join(crash_path, item) for item in all_items if
195                         os.path.isdir(os.path.join(crash_path, item))
196                         and item.startswith(f"crash_log_{test_suit}")]
197        if len(matching_dirs) >= 1:
198            tree.getroot().attrib["is_crash"] = "1"
199        tree.write(result_xml)
200
201
202# all testsuit common Unavailable test result xml
203def _create_empty_result_file(filepath, filename, error_message):
204    error_message = str(error_message)
205    error_message = error_message.replace("\"", "")
206    error_message = error_message.replace("<", "")
207    error_message = error_message.replace(">", "")
208    error_message = error_message.replace("&", "")
209    if filename.endswith(".hap"):
210        filename = filename.split(".")[0]
211    if not os.path.exists(filepath):
212        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
213            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
214                                       time.localtime())
215            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
216            file_desc.write(
217                '<testsuites tests="0" failures="0" '
218                'disabled="0" errors="0" timestamp="%s" '
219                'time="0" name="%s" unavailable="1">\n' % (time_stamp, filename))
220            file_desc.write(
221                '  <testsuite name="%s" tests="0" failures="0" '
222                'disabled="0" errors="0" time="0.0" '
223                'unavailable="1" message="%s">\n' %
224                (filename, error_message))
225            file_desc.write('  </testsuite>\n')
226            file_desc.write('</testsuites>\n')
227    return
228
229
230def _unlock_screen(device):
231    device.execute_shell_command("svc power stayon true")
232    time.sleep(1)
233
234
235def _unlock_device(device):
236    device.execute_shell_command("input keyevent 82")
237    time.sleep(1)
238    device.execute_shell_command("wm dismiss-keyguard")
239    time.sleep(1)
240
241
242def _lock_screen(device):
243    device.execute_shell_command("svc power stayon false")
244    time.sleep(1)
245
246
247def disable_keyguard(device):
248    _unlock_screen(device)
249    _unlock_device(device)
250
251
252def _sleep_according_to_result(result):
253    if result:
254        time.sleep(1)
255
256
257def _create_fuzz_crash_file(filepath, filename):
258    if not os.path.exists(filepath):
259        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
260            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
261                                       time.localtime())
262            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
263            file_desc.write(
264                '<testsuites disabled="0" name="%s" '
265                'time="300" timestamp="%s" errors="0" '
266                'failures="1" tests="1">\n' % (filename, time_stamp))
267            file_desc.write(
268                '  <testsuite disabled="0" name="%s" time="300" '
269                'errors="0" failures="1" tests="1">\n' % filename)
270            file_desc.write(
271                '    <testcase name="%s" time="300" classname="%s" '
272                'status="run">\n' % (filename, filename))
273            file_desc.write(
274                '      <failure type="" '
275                'message="Fuzzer crash. See ERROR in log file">\n')
276            file_desc.write('      </failure>\n')
277            file_desc.write('    </testcase>\n')
278            file_desc.write('  </testsuite>\n')
279            file_desc.write('</testsuites>\n')
280    return
281
282
283def _create_fuzz_pass_file(filepath, filename):
284    if not os.path.exists(filepath):
285        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
286            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
287                                       time.localtime())
288            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
289            file_desc.write(
290                '<testsuites disabled="0" name="%s" '
291                'time="300" timestamp="%s" errors="0" '
292                'failures="0" tests="1">\n' % (filename, time_stamp))
293            file_desc.write(
294                '  <testsuite disabled="0" name="%s" time="300" '
295                'errors="0" failures="0" tests="1">\n' % filename)
296            file_desc.write(
297                '    <testcase name="%s" time="300" classname="%s" '
298                'status="run"/>\n' % (filename, filename))
299            file_desc.write('  </testsuite>\n')
300            file_desc.write('</testsuites>\n')
301    return
302
303
304def _create_fuzz_result_file(filepath, filename, error_message):
305    error_message = str(error_message)
306    error_message = error_message.replace("\"", "")
307    error_message = error_message.replace("<", "")
308    error_message = error_message.replace(">", "")
309    error_message = error_message.replace("&", "")
310    if "AddressSanitizer" in error_message:
311        LOG.error("FUZZ TEST CRASH")
312        _create_fuzz_crash_file(filepath, filename)
313    elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second',
314                   error_message, re.M) is not None:
315        LOG.info("FUZZ TEST PASS")
316        _create_fuzz_pass_file(filepath, filename)
317    else:
318        LOG.error("FUZZ TEST UNAVAILABLE")
319        _create_empty_result_file(filepath, filename, error_message)
320    return
321
322
323class ResultManager(object):
324    def __init__(self, testsuit_path, config):
325        self.testsuite_path = testsuit_path
326        self.config = config
327        self.result_rootpath = self.config.report_path
328        self.device = self.config.device
329        if testsuit_path.endswith(".hap"):
330            self.device_testpath = self.config.test_hap_out_path
331        else:
332            self.device_testpath = self.config.target_test_path
333        self.testsuite_name = os.path.basename(self.testsuite_path)
334        self.is_coverage = False
335
336    def set_is_coverage(self, is_coverage):
337        self.is_coverage = is_coverage
338
339    def get_test_results(self, error_message=""):
340        # Get test result files
341        filepath, _ = self.obtain_test_result_file()
342        if "fuzztest" == self.config.testtype[0]:
343            LOG.info("create fuzz test report")
344            _create_fuzz_result_file(filepath, self.testsuite_name,
345                                     error_message)
346            if not self.is_coverage:
347                self._obtain_fuzz_corpus()
348
349        if not os.path.exists(filepath):
350            _create_empty_result_file(filepath, self.testsuite_name,
351                                      error_message)
352        if "benchmark" == self.config.testtype[0]:
353            self._obtain_benchmark_result()
354        # Get coverage data files
355        if self.is_coverage:
356            self.obtain_coverage_data()
357
358        return filepath
359
360    def get_test_results_hidelog(self, error_message=""):
361        # Get test result files
362        result_file_path, test_log_path = self.obtain_test_result_file()
363        log_content = ""
364        if not error_message:
365            if os.path.exists(test_log_path):
366                with open(test_log_path, "r") as log:
367                    log_content = log.readlines()
368            else:
369                LOG.error("{}: Test log not exist.".format(test_log_path))
370        else:
371            log_content = error_message
372
373        if "fuzztest" == self.config.testtype[0]:
374            LOG.info("create fuzz test report")
375            _create_fuzz_result_file(result_file_path, self.testsuite_name,
376                                     log_content)
377            if not self.is_coverage:
378                self._obtain_fuzz_corpus()
379
380        if not os.path.exists(result_file_path):
381            _create_empty_result_file(result_file_path, self.testsuite_name,
382                                      log_content)
383        if "benchmark" == self.config.testtype[0]:
384            self._obtain_benchmark_result()
385        # Get coverage data files
386        if self.is_coverage:
387            self.obtain_coverage_data()
388
389        return result_file_path
390
391    def get_result_sub_save_path(self):
392        find_key = os.sep + "benchmark" + os.sep
393        file_dir, _ = os.path.split(self.testsuite_path)
394        pos = file_dir.find(find_key)
395        subpath = ""
396        if -1 != pos:
397            subpath = file_dir[pos + len(find_key):]
398        LOG.info("subpath = " + subpath)
399        return subpath
400
401    def obtain_test_result_file(self):
402        result_save_path = get_result_savepath(self.testsuite_path,
403                                               self.result_rootpath)
404
405        result_file_path = os.path.join(result_save_path,
406                                        "%s.xml" % self.testsuite_name)
407
408        result_josn_file_path = os.path.join(result_save_path,
409                                             "%s.json" % self.testsuite_name)
410
411        if self.testsuite_path.endswith('.hap'):
412            remote_result_file = os.path.join(self.device_testpath,
413                                              "testcase_result.xml")
414            remote_json_result_file = os.path.join(self.device_testpath,
415                                                   "%s.json" % self.testsuite_name)
416        else:
417            remote_result_file = os.path.join(self.device_testpath,
418                                              "%s.xml" % self.testsuite_name)
419            remote_json_result_file = os.path.join(self.device_testpath,
420                                                   "%s.json" % self.testsuite_name)
421
422        if self.config.testtype[0] != "fuzztest":
423            if self.device.is_file_exist(remote_result_file):
424                self.device.pull_file(remote_result_file, result_file_path)
425            elif self.device.is_file_exist(remote_json_result_file):
426                self.device.pull_file(remote_json_result_file,
427                                      result_josn_file_path)
428                result_file_path = result_josn_file_path
429            else:
430                LOG.info("%s not exist", remote_result_file)
431
432        if self.config.hidelog:
433            remote_log_result_file = os.path.join(self.device_testpath,
434                                                  "%s.log" % self.testsuite_name)
435            test_log_save_path = get_test_log_savepath(self.result_rootpath, result_save_path)
436            test_log_file_path = os.path.join(test_log_save_path,
437                                              "%s.log" % self.testsuite_name)
438            self.device.pull_file(remote_log_result_file, test_log_file_path)
439            return result_file_path, test_log_file_path
440
441        return result_file_path, ""
442
443    def make_empty_result_file(self, error_message=""):
444        result_savepath = get_result_savepath(self.testsuite_path,
445                                              self.result_rootpath)
446        result_filepath = os.path.join(result_savepath, "%s.xml" %
447                                       self.testsuite_name)
448        if not os.path.exists(result_filepath):
449            _create_empty_result_file(result_filepath,
450                                      self.testsuite_name, error_message)
451
452    def is_exist_target_in_device(self, path, target):
453        if platform.system() == "Windows":
454            command = '\"ls -l %s | grep %s\"' % (path, target)
455        else:
456            command = "ls -l %s | grep %s" % (path, target)
457
458        check_result = False
459        stdout_info = self.device.execute_shell_command(command)
460        if stdout_info != "" and stdout_info.find(target) != -1:
461            check_result = True
462        return check_result
463
464    def obtain_coverage_data(self):
465        cov_root_dir = os.path.abspath(os.path.join(
466            self.result_rootpath,
467            "..",
468            "coverage",
469            "data",
470            "exec"))
471
472
473        tests_path = self.config.testcases_path
474        test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0]
475        cxx_cov_path = os.path.abspath(os.path.join(
476            self.result_rootpath,
477            "..",
478            "coverage",
479            "data",
480            "cxx",
481            self.testsuite_name + '_' + test_type))
482
483        if os.path.basename(self.testsuite_name).startswith("rust_"):
484            target_name = "lib.unstripped"
485        else:
486            target_name = OBJ
487        if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name):
488            if not os.path.exists(cxx_cov_path):
489                os.makedirs(cxx_cov_path)
490            else:
491                cxx_cov_path = cxx_cov_path + f"_{str(int(time.time()))}"
492            self.config.device.execute_shell_command(
493                "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name))
494            src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name)
495            self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT)
496            tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name)
497            if platform.system() == "Windows":
498                process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True)
499                process.communicate()
500                os.remove(tar_path)
501                os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ))
502            else:
503                subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" %
504                                 (tar_path, cxx_cov_path), shell=True).communicate()
505                subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate()
506                if target_name != OBJ:
507                    subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name),
508                                                   os.path.join(cxx_cov_path, OBJ)), shell=True).communicate()
509
510    def _obtain_fuzz_corpus(self):
511        command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;"
512        self.config.device.execute_shell_command(command)
513        result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath)
514        LOG.info(f"fuzz_dir = {result_save_path}")
515        self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path)
516
517    def _obtain_benchmark_result(self):
518        benchmark_root_dir = os.path.abspath(
519            os.path.join(self.result_rootpath, "benchmark"))
520        benchmark_dir = os.path.abspath(
521            os.path.join(benchmark_root_dir,
522                         self.get_result_sub_save_path(),
523                         self.testsuite_name))
524
525        if not os.path.exists(benchmark_dir):
526            os.makedirs(benchmark_dir)
527
528        LOG.info("benchmark_dir = %s" % benchmark_dir)
529        self.device.pull_file(os.path.join(self.device_testpath,
530                                           "%s.json" % self.testsuite_name), benchmark_dir)
531        if not os.path.exists(os.path.join(benchmark_dir,
532                                           "%s.json" % self.testsuite_name)):
533            os.rmdir(benchmark_dir)
534        return benchmark_dir
535
536
537@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test)
538class CppTestDriver(IDriver):
539    """
540    CppTest is a Test that runs a native test package on given device.
541    """
542    # test driver config
543    config = None
544    result = ""
545
546    def __check_environment__(self, device_options):
547        if len(device_options) == 1 and device_options[0].label is None:
548            return True
549        if len(device_options) != 1 or \
550                device_options[0].label != DeviceLabelType.phone:
551            return False
552        return True
553
554    def __check_config__(self, config):
555        pass
556
557    def __result__(self):
558        return self.result if os.path.exists(self.result) else ""
559
560    def __execute__(self, request):
561        try:
562            self.config = request.config
563            self.config.target_test_path = DEFAULT_TEST_PATH
564            self.config.device = request.config.environment.devices[0]
565
566            suite_file = request.root.source.source_file
567            LOG.debug("Testsuite FilePath: %s" % suite_file)
568
569            if not suite_file:
570                LOG.error("test source '%s' not exists" %
571                          request.root.source.source_string)
572                return
573
574            if not self.config.device:
575                result = ResultManager(suite_file, self.config)
576                result.set_is_coverage(False)
577                result.make_empty_result_file(
578                    "No test device is found. ")
579                return
580            self.config.device.set_device_report_path(request.config.report_path)
581            self.config.device.device_log_collector.start_hilog_task()
582            self._init_gtest()
583            self._run_gtest(suite_file)
584
585        finally:
586            log_path = get_result_savepath(request.root.source.source_file, request.config.report_path)
587            suit_name = os.path.basename(request.root.source.source_file)
588            xml_path = os.path.join(log_path, f"{suit_name}.xml")
589            if not os.path.exists(xml_path):
590                _create_empty_result_file(xml_path, suit_name, "ERROR")
591            serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns())
592            log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace(
593                ":", "_"))
594            try:
595                self.config.device.device_log_collector.stop_hilog_task(
596                    log_tar_file_name, module_name=request.get_module_name())
597            finally:
598                update_xml(request.root.source.source_file, xml_path)
599
600    @staticmethod
601    def _alter_init(name):
602        with open(name, "rb") as f:
603            lines = f.read()
604        str_content = lines.decode("utf-8")
605
606        pattern_sharp = '^\s*#.*$(\n|\r\n)'
607        pattern_star = '/\*.*?\*/(\n|\r\n)+'
608        pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+'
609
610        if re.match(pattern_sharp, str_content, flags=re.M):
611            striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M)
612        elif re.findall(pattern_star, str_content, flags=re.S):
613            striped_content = re.sub(pattern_star, '', str_content, flags=re.S)
614        elif re.findall(pattern_xml, str_content, flags=re.S):
615            striped_content = re.sub(pattern_xml, '', str_content, flags=re.S)
616        else:
617            striped_content = str_content
618
619        striped_bt = striped_content.encode("utf-8")
620        if os.path.exists(name):
621            os.remove(name)
622        with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f:
623            f.write(striped_bt)
624
625    def _init_gtest(self):
626        self.config.device.connector_command("target mount")
627        self.config.device.execute_shell_command(
628            "rm -rf %s" % self.config.target_test_path)
629        self.config.device.execute_shell_command(
630            "mkdir -p %s" % self.config.target_test_path)
631        self.config.device.execute_shell_command(
632            "mount -o rw,remount,rw /")
633        if "fuzztest" == self.config.testtype[0]:
634            self.config.device.execute_shell_command(
635                "mkdir -p %s" % os.path.join(self.config.target_test_path,
636                                             "corpus"))
637
638    def _gtest_command(self, suite_file):
639        filename = os.path.basename(suite_file)
640        test_para = self._get_test_para(self.config.testcase,
641                                        self.config.testlevel,
642                                        self.config.testtype,
643                                        self.config.target_test_path,
644                                        suite_file,
645                                        filename)
646
647        # execute testcase
648        if not self.config.coverage:
649            if self.config.random == "random":
650                seed = random.randint(1, 100)
651                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % (
652                    self.config.target_test_path,
653                    filename,
654                    filename,
655                    test_para,
656                    seed)
657            else:
658                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % (
659                    self.config.target_test_path,
660                    filename,
661                    filename,
662                    test_para)
663        else:
664            coverage_outpath = self.config.coverage_outpath
665            if coverage_outpath:
666                strip_num = len(coverage_outpath.strip("/").split("/"))
667            else:
668                ohos_config_path = os.path.join(sys.source_code_root_path, "out", "ohos_config.json")
669                with open(ohos_config_path, 'r') as json_file:
670                    json_info = json.load(json_file)
671                    out_path = json_info.get("out_path")
672                strip_num = len(out_path.strip("/").split("/"))
673            if "fuzztest" == self.config.testtype[0]:
674                self._push_corpus_cov_if_exist(suite_file)
675                command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \
676                                        rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX={DEFAULT_TEST_PATH}; \
677                                        GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}"
678            else:
679                command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=%s " \
680                          "GCOV_PREFIX_STRIP=%s ./%s %s" % \
681                          (self.config.target_test_path,
682                           filename,
683                           DEFAULT_TEST_PATH,
684                           str(strip_num),
685                           filename,
686                           test_para)
687
688        if self.config.hidelog:
689            command += " > {}.log 2>&1".format(filename)
690
691        return command
692
693    def _run_gtest(self, suite_file):
694        from xdevice import Variables
695        is_coverage_test = True if self.config.coverage else False
696
697        # push testsuite file
698        self.config.device.push_file(suite_file, self.config.target_test_path)
699        self.config.device.execute_shell_command(
700            "hilog -d %s" % (os.path.join(self.config.target_test_path,
701                                          os.path.basename(suite_file)))
702        )
703        self._push_corpus_if_exist(suite_file)
704
705        # push resource files
706        resource_manager = ResourceManager()
707        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
708        resource_manager.process_preparer_data(resource_data_dic, resource_dir,
709                                               self.config.device)
710
711        command = self._gtest_command(suite_file)
712
713        result = ResultManager(suite_file, self.config)
714        result.set_is_coverage(is_coverage_test)
715
716        try:
717            # get result
718            if self.config.hidelog:
719                return_message = ""
720                display_receiver = CollectingOutputReceiver()
721                self.config.device.execute_shell_command(
722                    command,
723                    receiver=display_receiver,
724                    timeout=TIME_OUT,
725                    retry=0)
726            else:
727                display_receiver = DisplayOutputReceiver()
728                self.config.device.execute_shell_command(
729                    command,
730                    receiver=display_receiver,
731                    timeout=TIME_OUT,
732                    retry=0)
733                return_message = display_receiver.output
734        except (ExecuteTerminate, DeviceError) as exception:
735            return_message = str(exception.args)
736
737        if self.config.hidelog:
738            self.result = result.get_test_results_hidelog(return_message)
739        else:
740            self.result = result.get_test_results(return_message)
741
742        resource_manager.process_cleaner_data(resource_data_dic,
743                                              resource_dir,
744                                              self.config.device)
745
746    def _push_corpus_cov_if_exist(self, suite_file):
747        corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep)
748        cov_file = os.path.join(
749            sys.framework_root_dir, "reports", "latest_corpus", corpus_path + "_corpus.tar.gz")
750        LOG.info("corpus_cov file :%s" % str(cov_file))
751        self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path))
752
753    def _push_corpus_if_exist(self, suite_file):
754        if "fuzztest" == self.config.testtype[0]:
755            corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus")
756            if not os.path.isdir(corpus_path):
757                return
758
759            corpus_dirs = []
760            corpus_file_list = []
761
762            for root, _, files in os.walk(corpus_path):
763                if not files:
764                    continue
765
766                corpus_dir = root.split("corpus")[-1]
767                if corpus_dir != "":
768                    corpus_dirs.append(corpus_dir)
769
770                for file in files:
771                    cp_file = os.path.normcase(os.path.join(root, file))
772                    corpus_file_list.append(cp_file)
773                    if file == "init":
774                        self._alter_init(cp_file)
775
776            # mkdir corpus files dir
777            if corpus_dirs:
778                for corpus in corpus_dirs:
779                    mkdir_corpus_command = f"shell; mkdir -p {corpus}"
780                    self.config.device.connector_command(mkdir_corpus_command)
781
782            # push corpus file
783            if corpus_file_list:
784                for corpus_file in corpus_file_list:
785                    self.config.device.push_file(corpus_file,
786                                                 os.path.join(self.config.target_test_path, "corpus"))
787
788    def _get_test_para(self,
789                       testcase,
790                       testlevel,
791                       testtype,
792                       target_test_path,
793                       suite_file,
794                       filename):
795        if "benchmark" == testtype[0]:
796            test_para = (" --benchmark_out_format=json"
797                         " --benchmark_out=%s%s.json") % (
798                            target_test_path, filename)
799            return test_para
800
801        if "" != testcase and "" == testlevel:
802            test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
803        elif "" == testcase and "" != testlevel:
804            level_para = get_level_para_string(testlevel)
805            test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
806        else:
807            test_para = ""
808
809        if "fuzztest" == testtype[0]:
810            cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path(
811                suite_file), "project.xml")).get_fuzzer_config("fuzztest")
812            LOG.info("config list :%s" % str(cfg_list))
813            if self.config.coverage:
814                test_para += "corpus -runs=0" + \
815                             " -max_len=" + cfg_list[0] + \
816                             " -max_total_time=" + cfg_list[1] + \
817                             " -rss_limit_mb=" + cfg_list[2]
818            else:
819                test_para += "corpus -max_len=" + cfg_list[0] + \
820                             " -max_total_time=" + cfg_list[1] + \
821                             " -rss_limit_mb=" + cfg_list[2]
822
823        return test_para
824
825
826@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test)
827class JSUnitTestDriver(IDriver):
828    """
829    JSUnitTestDriver is a Test that runs a native test package on given device.
830    """
831
832    def __init__(self):
833        self.config = None
834        self.result = ""
835        self.start_time = None
836        self.ability_name = ""
837        self.package_name = ""
838        # log
839        self.hilog = None
840        self.hilog_proc = None
841
842    def __check_environment__(self, device_options):
843        pass
844
845    def __check_config__(self, config):
846        pass
847
848    def __result__(self):
849        return self.result if os.path.exists(self.result) else ""
850
851    def __execute__(self, request):
852        try:
853            LOG.info("developer_test driver")
854            self.config = request.config
855            self.config.target_test_path = DEFAULT_TEST_PATH
856            self.config.device = request.config.environment.devices[0]
857
858            suite_file = request.root.source.source_file
859            result_save_path = get_result_savepath(suite_file, self.config.report_path)
860            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
861            if not suite_file:
862                LOG.error("test source '%s' not exists" %
863                          request.root.source.source_string)
864                return
865
866            if not self.config.device:
867                result = ResultManager(suite_file, self.config)
868                result.set_is_coverage(False)
869                result.make_empty_result_file(
870                    "No test device is found")
871                return
872
873            package_name, ability_name = self._get_package_and_ability_name(
874                suite_file)
875            self.package_name = package_name
876            self.ability_name = ability_name
877            self.config.test_hap_out_path = \
878                "/data/data/%s/files/" % self.package_name
879            self.config.device.connector_command("shell hilog -r")
880
881            self.hilog = get_device_log_file(
882                request.config.report_path,
883                request.config.device.__get_serial__() + "_" + request.
884                get_module_name(),
885                "device_hilog")
886
887            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
888                                 0o755)
889
890            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
891                self.config.device.device_log_collector.add_log_address(None, self.hilog)
892                _, self.hilog_proc = self.config.device.device_log_collector.\
893                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
894                self._init_jsunit_test()
895                self._run_jsunit(suite_file, self.hilog)
896                hilog_file_pipe.flush()
897                self.generate_console_output(self.hilog, request)
898                xml_path = os.path.join(
899                    request.config.report_path, "result",
900                    '.'.join((request.get_module_name(), "xml")))
901                shutil.move(xml_path, self.result)
902        finally:
903            self.config.device.device_log_collector.remove_log_address(None, self.hilog)
904            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
905            update_xml(request.root.source.source_file, self.result)
906
907    @staticmethod
908    def _get_acts_test_para(testcase,
909                            testlevel,
910                            testtype,
911                            target_test_path,
912                            suite_file,
913                            filename):
914        if "actstest" == testtype[0]:
915            test_para = (" --actstest_out_format=json"
916                         " --actstest_out=%s%s.json") % (
917                            target_test_path, filename)
918            return test_para
919
920        if "" != testcase and "" == testlevel:
921            test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase)
922        elif "" == testcase and "" != testlevel:
923            level_para = get_level_para_string(testlevel)
924            test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para)
925        else:
926            test_para = ""
927        return test_para
928
929    @staticmethod
930    def _get_hats_test_para(testcase,
931                            testlevel,
932                            testtype,
933                            target_test_path,
934                            suite_file,
935                            filename):
936        if "hatstest" == testtype[0]:
937            test_hats_para = (" --hatstest_out_format=json"
938                         " --hatstest_out=%s%s.json") % (
939                            target_test_path, filename)
940            return test_hats_para
941
942        if "" != testcase and "" == testlevel:
943            test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
944        elif "" == testcase and "" != testlevel:
945            level_para = get_level_para_string(testlevel)
946            test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
947        else:
948            test_hats_para = ""
949        return test_hats_para
950
951    @classmethod
952    def _get_json_shell_timeout(cls, json_filepath):
953        test_timeout = 0
954        try:
955            with open(json_filepath, 'r') as json_file:
956                data_dic = json.load(json_file)
957                if not data_dic:
958                    return test_timeout
959                else:
960                    if "driver" in data_dic.keys():
961                        driver_dict = data_dic.get("driver")
962                        if driver_dict and "test-timeout" in driver_dict.keys():
963                            test_timeout = int(driver_dict["shell-timeout"]) / 1000
964                    return test_timeout
965        except JSONDecodeError:
966            return test_timeout
967        finally:
968            print(" get json shell timeout finally")
969
970    @staticmethod
971    def _get_package_and_ability_name(hap_filepath):
972        package_name = ""
973        ability_name = ""
974        if os.path.exists(hap_filepath):
975            filename = os.path.basename(hap_filepath)
976
977            # unzip the hap file
978            hap_bak_path = os.path.abspath(os.path.join(
979                os.path.dirname(hap_filepath),
980                "%s.bak" % filename))
981            zf_desc = zipfile.ZipFile(hap_filepath)
982            try:
983                zf_desc.extractall(path=hap_bak_path)
984            except RuntimeError as error:
985                print("Unzip error: ", hap_bak_path)
986            zf_desc.close()
987
988            # verify config.json file
989            app_profile_path = os.path.join(hap_bak_path, "config.json")
990            if not os.path.exists(app_profile_path):
991                print("file %s not exist" % app_profile_path)
992                return package_name, ability_name
993
994            if os.path.isdir(app_profile_path):
995                print("%s is a folder, and not a file" % app_profile_path)
996                return package_name, ability_name
997
998            # get package_name and ability_name value
999            load_dict = {}
1000            with open(app_profile_path, 'r') as load_f:
1001                load_dict = json.load(load_f)
1002            profile_list = load_dict.values()
1003            for profile in profile_list:
1004                package_name = profile.get("package")
1005                if not package_name:
1006                    continue
1007                abilities = profile.get("abilities")
1008                for abilitie in abilities:
1009                    abilities_name = abilitie.get("name")
1010                    if abilities_name.startswith("."):
1011                        ability_name = package_name + abilities_name[
1012                                                      abilities_name.find("."):]
1013                    else:
1014                        ability_name = abilities_name
1015                    break
1016                break
1017
1018            # delete hap_bak_path
1019            if os.path.exists(hap_bak_path):
1020                shutil.rmtree(hap_bak_path)
1021        else:
1022            print("file %s not exist" % hap_filepath)
1023        return package_name, ability_name
1024
1025    def generate_console_output(self, device_log_file, request):
1026        result_message = self.read_device_log(device_log_file)
1027
1028        report_name = request.get_module_name()
1029        parsers = get_plugin(
1030            Plugin.PARSER, CommonParserType.jsunit)
1031        if parsers:
1032            parsers = parsers[:1]
1033        for listener in request.listeners:
1034            listener.device_sn = self.config.device.device_sn
1035        parser_instances = []
1036
1037        for parser in parsers:
1038            parser_instance = parser.__class__()
1039            parser_instance.suites_name = report_name
1040            parser_instance.suite_name = report_name
1041            parser_instance.listeners = request.listeners
1042            parser_instances.append(parser_instance)
1043        handler = ShellHandler(parser_instances)
1044        process_command_ret(result_message, handler)
1045
1046    def read_device_log(self, device_log_file):
1047        result_message = ""
1048        with open(device_log_file, "r", encoding='utf-8',
1049                  errors='ignore') as file_read_pipe:
1050            while True:
1051                data = file_read_pipe.readline()
1052                if not data:
1053                    break
1054                # only filter JSApp log
1055                if data.lower().find(_ACE_LOG_MARKER) != -1:
1056                    result_message += data
1057                    if data.find("[end] run suites end") != -1:
1058                        break
1059        return result_message
1060
1061    def start_hap_execute(self):
1062        try:
1063            command = "aa start -d 123 -a %s.MainAbility -b %s" \
1064                      % (self.package_name, self.package_name)
1065            self.start_time = time.time()
1066            result_value = self.config.device.execute_shell_command(
1067                command, timeout=TIME_OUT)
1068
1069            if "success" in str(result_value).lower():
1070                LOG.info("execute %s's testcase success. result value=%s"
1071                         % (self.package_name, result_value))
1072            else:
1073                LOG.info("execute %s's testcase failed. result value=%s"
1074                         % (self.package_name, result_value))
1075
1076            _sleep_according_to_result(result_value)
1077            return_message = result_value
1078        except (ExecuteTerminate, DeviceError) as exception:
1079            return_message = exception.args
1080
1081        return return_message
1082
1083    def _init_jsunit_test(self):
1084        self.config.device.connector_command("target mount")
1085        self.config.device.execute_shell_command(
1086            "rm -rf %s" % self.config.target_test_path)
1087        self.config.device.execute_shell_command(
1088            "mkdir -p %s" % self.config.target_test_path)
1089        self.config.device.execute_shell_command(
1090            "mount -o rw,remount,rw /")
1091
1092    def _run_jsunit(self, suite_file, device_log_file):
1093        filename = os.path.basename(suite_file)
1094        _, suffix_name = os.path.splitext(filename)
1095
1096        resource_manager = ResourceManager()
1097        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
1098        if suffix_name == ".hap":
1099            json_file_path = suite_file.replace(".hap", ".json")
1100            if os.path.exists(json_file_path):
1101                timeout = self._get_json_shell_timeout(json_file_path)
1102            else:
1103                timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
1104        else:
1105            timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
1106        resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device)
1107        main_result = self._install_hap(suite_file)
1108        result = ResultManager(suite_file, self.config)
1109        if main_result:
1110            self._execute_hapfile_jsunittest()
1111            try:
1112                status = False
1113                actiontime = JS_TIMEOUT
1114                times = CYCLE_TIMES
1115                if timeout:
1116                    actiontime = timeout
1117                    times = 1
1118                with open(device_log_file, "r", encoding='utf-8',
1119                          errors='ignore') as file_read_pipe:
1120                    for i in range(0, times):
1121                        if status:
1122                            break
1123                        else:
1124                            time.sleep(float(actiontime))
1125                        start_time = int(time.time())
1126                        while True:
1127                            data = file_read_pipe.readline()
1128                            if data.lower().find(_ACE_LOG_MARKER) != -1 and data.find("[end] run suites end") != -1:
1129                                LOG.info("execute testcase successfully.")
1130                                status = True
1131                                break
1132                            if int(time.time()) - start_time > 5:
1133                                break
1134            finally:
1135                _lock_screen(self.config.device)
1136                self._uninstall_hap(self.package_name)
1137        else:
1138            self.result = result.get_test_results("Error: install hap failed")
1139            LOG.error("Error: install hap failed")
1140
1141        resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device)
1142
1143    def _execute_hapfile_jsunittest(self):
1144        _unlock_screen(self.config.device)
1145        _unlock_device(self.config.device)
1146
1147        try:
1148            return_message = self.start_hap_execute()
1149        except (ExecuteTerminate, DeviceError) as exception:
1150            return_message = str(exception.args)
1151
1152        return return_message
1153
1154    def _install_hap(self, suite_file):
1155        message = self.config.device.connector_command("install %s" % suite_file)
1156        message = str(message).rstrip()
1157        if message == "" or "success" in message:
1158            return_code = True
1159            if message != "":
1160                LOG.info(message)
1161        else:
1162            return_code = False
1163            if message != "":
1164                LOG.warning(message)
1165
1166        _sleep_according_to_result(return_code)
1167        return return_code
1168
1169    def _uninstall_hap(self, package_name):
1170        return_message = self.config.device.execute_shell_command(
1171            "bm uninstall -n %s" % package_name)
1172        _sleep_according_to_result(return_message)
1173        return return_message
1174
1175
1176@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
1177class OHRustTestDriver(IDriver):
1178    def __init__(self):
1179        self.result = ""
1180        self.error_message = ""
1181        self.config = None
1182
1183    def __check_environment__(self, device_options):
1184        pass
1185
1186    def __check_config__(self, config):
1187        pass
1188
1189    def __execute__(self, request):
1190        try:
1191            LOG.debug("Start to execute open harmony rust test")
1192            self.config = request.config
1193            self.config.device = request.config.environment.devices[0]
1194            self.config.target_test_path = DEFAULT_TEST_PATH
1195            suite_file = request.root.source.source_file
1196            LOG.debug("Testsuite filepath:{}".format(suite_file))
1197
1198            if not suite_file:
1199                LOG.error("test source '{}' not exists".format(
1200                    request.root.source.source_string))
1201                return
1202
1203            result_save_path = get_result_savepath(suite_file, self.config.report_path)
1204            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
1205            self.config.device.set_device_report_path(request.config.report_path)
1206            self.config.device.device_log_collector.start_hilog_task()
1207            self._init_oh_rust()
1208            self._run_oh_rust(suite_file, request)
1209        except Exception as exception:
1210            self.error_message = exception
1211            if not getattr(exception, "error_no", ""):
1212                setattr(exception, "error_no", "03409")
1213            LOG.exception(self.error_message, exc_info=False, error_no="03409")
1214        finally:
1215            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
1216                                    time.time_ns())
1217            log_tar_file_name = "{}_{}".format(
1218                request.get_module_name(), str(serial).replace(":", "_"))
1219            try:
1220                self.config.device.device_log_collector.stop_hilog_task(
1221                    log_tar_file_name, module_name=request.get_module_name())
1222            finally:
1223                xml_path = os.path.join(
1224                    request.config.report_path, "result",
1225                    '.'.join((request.get_module_name(), "xml")))
1226                shutil.move(xml_path, self.result)
1227                update_xml(request.root.source.source_file, self.result)
1228                self.result = check_result_report(
1229                    request.config.report_path, self.result, self.error_message)
1230
1231    def __result__(self):
1232        return self.result if os.path.exists(self.result) else ""
1233
1234    def _init_oh_rust(self):
1235        self.config.device.connector_command("target mount")
1236        self.config.device.execute_shell_command(
1237            "rm -rf %s" % self.config.target_test_path)
1238        self.config.device.execute_shell_command(
1239            "mkdir -p %s" % self.config.target_test_path)
1240        self.config.device.execute_shell_command(
1241            "mount -o rw,remount,rw /")
1242
1243    def _run_oh_rust(self, suite_file, request=None):
1244        self.config.device.push_file(suite_file, self.config.target_test_path)
1245        self.config.device.execute_shell_command(
1246            "hilog -d %s" % (os.path.join(self.config.target_test_path,
1247                                          os.path.basename(suite_file)))
1248        )
1249        resource_manager = ResourceManager()
1250        resource_data_dict, resource_dir = \
1251            resource_manager.get_resource_data_dic(suite_file)
1252        resource_manager.process_preparer_data(resource_data_dict,
1253                                               resource_dir,
1254                                               self.config.device)
1255        for listener in request.listeners:
1256            listener.device_sn = self.config.device.device_sn
1257
1258        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust)
1259        if parsers:
1260            parsers = parsers[:1]
1261        parser_instances = []
1262        for parser in parsers:
1263            parser_instance = parser.__class__()
1264            parser_instance.suite_name = request.get_module_name()
1265            parser_instance.listeners = request.listeners
1266            parser_instances.append(parser_instance)
1267        handler = ShellHandler(parser_instances)
1268        if self.config.coverage:
1269            command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format(
1270                self.config.target_test_path, os.path.basename(suite_file))
1271        else:
1272            command = "cd {}; chmod +x *; ./{}".format(
1273                self.config.target_test_path, os.path.basename(suite_file))
1274        self.config.device.execute_shell_command(
1275            command, timeout=TIME_OUT, receiver=handler, retry=0)
1276        if self.config.coverage:
1277            result = ResultManager(suite_file, self.config)
1278            result.obtain_coverage_data()
1279        resource_manager.process_cleaner_data(resource_data_dict, resource_dir,
1280                                              self.config.device)
1281