• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21import re
22import shutil
23import subprocess
24import sys
25import time
26import platform
27import zipfile
28import stat
29import random
30import xml.etree.ElementTree as ET
31from dataclasses import dataclass
32from json import JSONDecodeError
33
34from xdevice import DeviceTestType, check_result_report
35from xdevice import DeviceLabelType
36from xdevice import CommonParserType
37from xdevice import ExecuteTerminate
38from xdevice import DeviceError
39from xdevice import ShellHandler
40
41from xdevice import IDriver
42from xdevice import platform_logger
43from xdevice import Plugin
44from xdevice import get_plugin
45from ohos.environment.dmlib import process_command_ret
46from core.utils import get_decode
47from core.utils import get_fuzzer_path
48from core.config.resource_manager import ResourceManager
49from core.config.config_manager import FuzzerConfigManager
50
51__all__ = [
52    "CppTestDriver",
53    "JSUnitTestDriver",
54    "disable_keyguard",
55    "GTestConst"]
56
57LOG = platform_logger("Drivers")
58DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
59OBJ = "obj"
60_ACE_LOG_MARKER = " a0c0d0"
61TIME_OUT = 900 * 1000
62JS_TIMEOUT = 10
63CYCLE_TIMES = 50
64
65FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL
66MODES = stat.S_IWUSR | stat.S_IRUSR
67
68
69class CollectingOutputReceiver:
70    def __init__(self):
71        self.output = ""
72
73    def __read__(self, output):
74        self.output = "%s%s" % (self.output, output)
75
76    def __error__(self, message):
77        pass
78
79    def __done__(self, result_code="", message=""):
80        pass
81
82
83class DisplayOutputReceiver:
84    def __init__(self):
85        self.output = ""
86        self.unfinished_line = ""
87
88    def __read__(self, output):
89        self.output = "%s%s" % (self.output, output)
90        lines = self._process_output(output)
91        for line in lines:
92            line = line.strip()
93            if line:
94                LOG.info(get_decode(line))
95
96    def __error__(self, message):
97        pass
98
99    def __done__(self, result_code="", message=""):
100        pass
101
102    def _process_output(self, output, end_mark="\n"):
103        content = output
104        if self.unfinished_line:
105            content = "".join((self.unfinished_line, content))
106            self.unfinished_line = ""
107        lines = content.split(end_mark)
108        if content.endswith(end_mark):
109            return lines[:-1]
110        else:
111            self.unfinished_line = lines[-1]
112            return lines[:-1]
113
114
115@dataclass
116class GTestConst(object):
117    exec_para_filter = "--gtest_filter"
118    exec_para_level = "--gtest_testsize"
119    exec_acts_para_filter = "--jstest_filter"
120    exec_acts_para_level = "--jstest_testsize"
121
122
123def get_device_log_file(report_path, serial=None, log_name="device_log"):
124    from xdevice import Variables
125    log_path = os.path.join(report_path, Variables.report_vars.log_dir)
126    os.makedirs(log_path, exist_ok=True)
127
128    serial = serial or time.time_ns()
129    device_file_name = "{}_{}.log".format(log_name, serial)
130    device_log_file = os.path.join(log_path, device_file_name)
131    return device_log_file
132
133
134def get_level_para_string(level_string):
135    level_list = list(set(level_string.split(",")))
136    level_para_string = ""
137    for item in level_list:
138        if not item.isdigit():
139            continue
140        item = item.strip(" ")
141        level_para_string = f"{level_para_string}Level{item},"
142    level_para_string = level_para_string.strip(",")
143    return level_para_string
144
145
146def get_result_savepath(testsuit_path, result_rootpath):
147    findkey = os.sep + "tests" + os.sep
148    filedir, _ = os.path.split(testsuit_path)
149    pos = filedir.find(findkey)
150    if -1 != pos:
151        subpath = filedir[pos + len(findkey):]
152        pos1 = subpath.find(os.sep)
153        if -1 != pos1:
154            subpath = subpath[pos1 + len(os.sep):]
155            result_path = os.path.join(result_rootpath, "result", subpath)
156        else:
157            result_path = os.path.join(result_rootpath, "result")
158    else:
159        result_path = os.path.join(result_rootpath, "result")
160
161    if not os.path.exists(result_path):
162        os.makedirs(result_path)
163
164    LOG.info("result_savepath = " + result_path)
165    return result_path
166
167
168def get_test_log_savepath(result_rootpath, result_suit_path):
169    suit_path = result_suit_path.split("result")[-1].strip(os.sep).split(os.sep)[0]
170    test_log_path = os.path.join(result_rootpath, "log", "test_log", suit_path)
171
172    if not os.path.exists(test_log_path):
173        os.makedirs(test_log_path)
174
175    LOG.info("test_log_savepath = {}".format(test_log_path))
176    return test_log_path
177
178
179def update_xml(suite_file, result_xml):
180    suite_path_txt = suite_file.split(".")[0] + "_path.txt"
181    if os.path.exists(suite_path_txt) and os.path.exists(result_xml):
182        with open(suite_path_txt, "r") as path_text:
183            line = path_text.readline().replace("//", "").strip()
184        tree = ET.parse(result_xml)
185        tree.getroot().attrib["path"] = line
186        tree.write(result_xml)
187
188
189# all testsuit common Unavailable test result xml
190def _create_empty_result_file(filepath, filename, error_message):
191    error_message = str(error_message)
192    error_message = error_message.replace("\"", "")
193    error_message = error_message.replace("<", "")
194    error_message = error_message.replace(">", "")
195    error_message = error_message.replace("&", "")
196    if filename.endswith(".hap"):
197        filename = filename.split(".")[0]
198    if not os.path.exists(filepath):
199        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
200            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
201                                       time.localtime())
202            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
203            file_desc.write(
204                '<testsuites tests="0" failures="0" '
205                'disabled="0" errors="0" timestamp="%s" '
206                'time="0" name="AllTests">\n' % time_stamp)
207            file_desc.write(
208                '  <testsuite name="%s" tests="0" failures="0" '
209                'disabled="0" errors="0" time="0.0" '
210                'unavailable="1" message="%s">\n' %
211                (filename, error_message))
212            file_desc.write('  </testsuite>\n')
213            file_desc.write('</testsuites>\n')
214    return
215
216
217def _unlock_screen(device):
218    device.execute_shell_command("svc power stayon true")
219    time.sleep(1)
220
221
222def _unlock_device(device):
223    device.execute_shell_command("input keyevent 82")
224    time.sleep(1)
225    device.execute_shell_command("wm dismiss-keyguard")
226    time.sleep(1)
227
228
229def _lock_screen(device):
230    device.execute_shell_command("svc power stayon false")
231    time.sleep(1)
232
233
234def disable_keyguard(device):
235    _unlock_screen(device)
236    _unlock_device(device)
237
238
239def _sleep_according_to_result(result):
240    if result:
241        time.sleep(1)
242
243
244def _create_fuzz_crash_file(filepath, filename):
245    if not os.path.exists(filepath):
246        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
247            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
248                                       time.localtime())
249            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
250            file_desc.write(
251                '<testsuites disabled="0" name="AllTests" '
252                'time="300" timestamp="%s" errors="0" '
253                'failures="1" tests="1">\n' % time_stamp)
254            file_desc.write(
255                '  <testsuite disabled="0" name="%s" time="300" '
256                'errors="0" failures="1" tests="1">\n' % filename)
257            file_desc.write(
258                '    <testcase name="%s" time="300" classname="%s" '
259                'status="run">\n' % (filename, filename))
260            file_desc.write(
261                '      <failure type="" '
262                'message="Fuzzer crash. See ERROR in log file">\n')
263            file_desc.write('      </failure>\n')
264            file_desc.write('    </testcase>\n')
265            file_desc.write('  </testsuite>\n')
266            file_desc.write('</testsuites>\n')
267    return
268
269
270def _create_fuzz_pass_file(filepath, filename):
271    if not os.path.exists(filepath):
272        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
273            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
274                                       time.localtime())
275            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
276            file_desc.write(
277                '<testsuites disabled="0" name="AllTests" '
278                'time="300" timestamp="%s" errors="0" '
279                'failures="0" tests="1">\n' % time_stamp)
280            file_desc.write(
281                '  <testsuite disabled="0" name="%s" time="300" '
282                'errors="0" failures="0" tests="1">\n' % filename)
283            file_desc.write(
284                '    <testcase name="%s" time="300" classname="%s" '
285                'status="run"/>\n' % (filename, filename))
286            file_desc.write('  </testsuite>\n')
287            file_desc.write('</testsuites>\n')
288    return
289
290
291def _create_fuzz_result_file(filepath, filename, error_message):
292    error_message = str(error_message)
293    error_message = error_message.replace("\"", "")
294    error_message = error_message.replace("<", "")
295    error_message = error_message.replace(">", "")
296    error_message = error_message.replace("&", "")
297    if "AddressSanitizer" in error_message:
298        LOG.error("FUZZ TEST CRASH")
299        _create_fuzz_crash_file(filepath, filename)
300    elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second',
301                   error_message, re.M) is not None:
302        LOG.info("FUZZ TEST PASS")
303        _create_fuzz_pass_file(filepath, filename)
304    else:
305        LOG.error("FUZZ TEST UNAVAILABLE")
306        _create_empty_result_file(filepath, filename, error_message)
307    return
308
309
310class ResultManager(object):
311    def __init__(self, testsuit_path, config):
312        self.testsuite_path = testsuit_path
313        self.config = config
314        self.result_rootpath = self.config.report_path
315        self.device = self.config.device
316        if testsuit_path.endswith(".hap"):
317            self.device_testpath = self.config.test_hap_out_path
318        else:
319            self.device_testpath = self.config.target_test_path
320        self.testsuite_name = os.path.basename(self.testsuite_path)
321        self.is_coverage = False
322
323    def set_is_coverage(self, is_coverage):
324        self.is_coverage = is_coverage
325
326    def get_test_results(self, error_message=""):
327        # Get test result files
328        filepath, _ = self.obtain_test_result_file()
329        if "fuzztest" == self.config.testtype[0]:
330            LOG.info("create fuzz test report")
331            _create_fuzz_result_file(filepath, self.testsuite_name,
332                                     error_message)
333            if not self.is_coverage:
334                self._obtain_fuzz_corpus()
335
336        if not os.path.exists(filepath):
337            _create_empty_result_file(filepath, self.testsuite_name,
338                                      error_message)
339        if "benchmark" == self.config.testtype[0]:
340            self._obtain_benchmark_result()
341        # Get coverage data files
342        if self.is_coverage:
343            self.obtain_coverage_data()
344
345        return filepath
346
347    def get_test_results_hidelog(self, error_message=""):
348        # Get test result files
349        result_file_path, test_log_path = self.obtain_test_result_file()
350        log_content = ""
351        if not error_message:
352            if os.path.exists(test_log_path):
353                with open(test_log_path, "r") as log:
354                    log_content = log.readlines()
355            else:
356                LOG.error("{}: Test log not exist.".format(test_log_path))
357        else:
358            log_content = error_message
359
360        if "fuzztest" == self.config.testtype[0]:
361            LOG.info("create fuzz test report")
362            _create_fuzz_result_file(result_file_path, self.testsuite_name,
363                                     log_content)
364            if not self.is_coverage:
365                self._obtain_fuzz_corpus()
366
367        if not os.path.exists(result_file_path):
368            _create_empty_result_file(result_file_path, self.testsuite_name,
369                                      log_content)
370        if "benchmark" == self.config.testtype[0]:
371            self._obtain_benchmark_result()
372        # Get coverage data files
373        if self.is_coverage:
374            self.obtain_coverage_data()
375
376        return result_file_path
377
378    def get_result_sub_save_path(self):
379        find_key = os.sep + "benchmark" + os.sep
380        file_dir, _ = os.path.split(self.testsuite_path)
381        pos = file_dir.find(find_key)
382        subpath = ""
383        if -1 != pos:
384            subpath = file_dir[pos + len(find_key):]
385        LOG.info("subpath = " + subpath)
386        return subpath
387
388    def obtain_test_result_file(self):
389        result_save_path = get_result_savepath(self.testsuite_path,
390                                               self.result_rootpath)
391
392        result_file_path = os.path.join(result_save_path,
393                                        "%s.xml" % self.testsuite_name)
394
395        result_josn_file_path = os.path.join(result_save_path,
396                                             "%s.json" % self.testsuite_name)
397
398        if self.testsuite_path.endswith('.hap'):
399            remote_result_file = os.path.join(self.device_testpath,
400                                              "testcase_result.xml")
401            remote_json_result_file = os.path.join(self.device_testpath,
402                                                   "%s.json" % self.testsuite_name)
403        else:
404            remote_result_file = os.path.join(self.device_testpath,
405                                              "%s.xml" % self.testsuite_name)
406            remote_json_result_file = os.path.join(self.device_testpath,
407                                                   "%s.json" % self.testsuite_name)
408
409        if self.config.testtype[0] != "fuzztest":
410            if self.device.is_file_exist(remote_result_file):
411                self.device.pull_file(remote_result_file, result_file_path)
412            elif self.device.is_file_exist(remote_json_result_file):
413                self.device.pull_file(remote_json_result_file,
414                                      result_josn_file_path)
415                result_file_path = result_josn_file_path
416            else:
417                LOG.info("%s not exist", remote_result_file)
418
419        if self.config.hidelog:
420            remote_log_result_file = os.path.join(self.device_testpath,
421                                                  "%s.log" % self.testsuite_name)
422            test_log_save_path = get_test_log_savepath(self.result_rootpath, result_save_path)
423            test_log_file_path = os.path.join(test_log_save_path,
424                                              "%s.log" % self.testsuite_name)
425            self.device.pull_file(remote_log_result_file, test_log_file_path)
426            return result_file_path, test_log_file_path
427
428        return result_file_path, ""
429
430    def make_empty_result_file(self, error_message=""):
431        result_savepath = get_result_savepath(self.testsuite_path,
432                                              self.result_rootpath)
433        result_filepath = os.path.join(result_savepath, "%s.xml" %
434                                       self.testsuite_name)
435        if not os.path.exists(result_filepath):
436            _create_empty_result_file(result_filepath,
437                                      self.testsuite_name, error_message)
438
439    def is_exist_target_in_device(self, path, target):
440        if platform.system() == "Windows":
441            command = '\"ls -l %s | grep %s\"' % (path, target)
442        else:
443            command = "ls -l %s | grep %s" % (path, target)
444
445        check_result = False
446        stdout_info = self.device.execute_shell_command(command)
447        if stdout_info != "" and stdout_info.find(target) != -1:
448            check_result = True
449        return check_result
450
451    def obtain_coverage_data(self):
452        cov_root_dir = os.path.abspath(os.path.join(
453            self.result_rootpath,
454            "..",
455            "coverage",
456            "data",
457            "exec"))
458
459
460        tests_path = self.config.testcases_path
461        test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0]
462        cxx_cov_path = os.path.abspath(os.path.join(
463            self.result_rootpath,
464            "..",
465            "coverage",
466            "data",
467            "cxx",
468            self.testsuite_name + '_' + test_type))
469
470        if os.path.basename(self.testsuite_name).startswith("rust_"):
471            target_name = "lib.unstripped"
472        else:
473            target_name = OBJ
474        if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name):
475            if not os.path.exists(cxx_cov_path):
476                os.makedirs(cxx_cov_path)
477            else:
478                cxx_cov_path = cxx_cov_path + f"_{str(int(time.time()))}"
479            self.config.device.execute_shell_command(
480                "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name))
481            src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name)
482            self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT)
483            tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name)
484            if platform.system() == "Windows":
485                process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True)
486                process.communicate()
487                os.remove(tar_path)
488                os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ))
489            else:
490                subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" %
491                                 (tar_path, cxx_cov_path), shell=True).communicate()
492                subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate()
493                if target_name != OBJ:
494                    subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name),
495                                                   os.path.join(cxx_cov_path, OBJ)), shell=True).communicate()
496
497    def _obtain_fuzz_corpus(self):
498        command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;"
499        self.config.device.execute_shell_command(command)
500        result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath)
501        LOG.info(f"fuzz_dir = {result_save_path}")
502        self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path)
503
504    def _obtain_benchmark_result(self):
505        benchmark_root_dir = os.path.abspath(
506            os.path.join(self.result_rootpath, "benchmark"))
507        benchmark_dir = os.path.abspath(
508            os.path.join(benchmark_root_dir,
509                         self.get_result_sub_save_path(),
510                         self.testsuite_name))
511
512        if not os.path.exists(benchmark_dir):
513            os.makedirs(benchmark_dir)
514
515        LOG.info("benchmark_dir = %s" % benchmark_dir)
516        self.device.pull_file(os.path.join(self.device_testpath,
517                                           "%s.json" % self.testsuite_name), benchmark_dir)
518        if not os.path.exists(os.path.join(benchmark_dir,
519                                           "%s.json" % self.testsuite_name)):
520            os.rmdir(benchmark_dir)
521        return benchmark_dir
522
523
524@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test)
525class CppTestDriver(IDriver):
526    """
527    CppTest is a Test that runs a native test package on given device.
528    """
529    # test driver config
530    config = None
531    result = ""
532
533    def __check_environment__(self, device_options):
534        if len(device_options) == 1 and device_options[0].label is None:
535            return True
536        if len(device_options) != 1 or \
537                device_options[0].label != DeviceLabelType.phone:
538            return False
539        return True
540
541    def __check_config__(self, config):
542        pass
543
544    def __result__(self):
545        return self.result if os.path.exists(self.result) else ""
546
547    def __execute__(self, request):
548        try:
549            self.config = request.config
550            self.config.target_test_path = DEFAULT_TEST_PATH
551            self.config.device = request.config.environment.devices[0]
552
553            suite_file = request.root.source.source_file
554            LOG.debug("Testsuite FilePath: %s" % suite_file)
555
556            if not suite_file:
557                LOG.error("test source '%s' not exists" %
558                          request.root.source.source_string)
559                return
560
561            if not self.config.device:
562                result = ResultManager(suite_file, self.config)
563                result.set_is_coverage(False)
564                result.make_empty_result_file(
565                    "No test device is found. ")
566                return
567            self.config.device.set_device_report_path(request.config.report_path)
568            self.config.device.device_log_collector.start_hilog_task()
569            self._init_gtest()
570            self._run_gtest(suite_file)
571
572        finally:
573            log_path = get_result_savepath(request.root.source.source_file, request.config.report_path)
574            suit_name = os.path.basename(request.root.source.source_file)
575            xml_path = os.path.join(log_path, f"{suit_name}.xml")
576            if not os.path.exists(xml_path):
577                _create_empty_result_file(xml_path, suit_name, "ERROR")
578                update_xml(request.root.source.source_file, xml_path)
579            serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns())
580            log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace(
581                ":", "_"))
582            self.config.device.device_log_collector.stop_hilog_task(
583                log_tar_file_name, module_name=request.get_module_name())
584
585    @staticmethod
586    def _alter_init(name):
587        with open(name, "rb") as f:
588            lines = f.read()
589        str_content = lines.decode("utf-8")
590
591        pattern_sharp = '^\s*#.*$(\n|\r\n)'
592        pattern_star = '/\*.*?\*/(\n|\r\n)+'
593        pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+'
594
595        if re.match(pattern_sharp, str_content, flags=re.M):
596            striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M)
597        elif re.findall(pattern_star, str_content, flags=re.S):
598            striped_content = re.sub(pattern_star, '', str_content, flags=re.S)
599        elif re.findall(pattern_xml, str_content, flags=re.S):
600            striped_content = re.sub(pattern_xml, '', str_content, flags=re.S)
601        else:
602            striped_content = str_content
603
604        striped_bt = striped_content.encode("utf-8")
605        if os.path.exists(name):
606            os.remove(name)
607        with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f:
608            f.write(striped_bt)
609
610    def _init_gtest(self):
611        self.config.device.connector_command("target mount")
612        self.config.device.execute_shell_command(
613            "rm -rf %s" % self.config.target_test_path)
614        self.config.device.execute_shell_command(
615            "mkdir -p %s" % self.config.target_test_path)
616        self.config.device.execute_shell_command(
617            "mount -o rw,remount,rw /")
618        if "fuzztest" == self.config.testtype[0]:
619            self.config.device.execute_shell_command(
620                "mkdir -p %s" % os.path.join(self.config.target_test_path,
621                                             "corpus"))
622
623    def _gtest_command(self, suite_file):
624        filename = os.path.basename(suite_file)
625        test_para = self._get_test_para(self.config.testcase,
626                                        self.config.testlevel,
627                                        self.config.testtype,
628                                        self.config.target_test_path,
629                                        suite_file,
630                                        filename)
631
632        # execute testcase
633        if not self.config.coverage:
634            if self.config.random == "random":
635                seed = random.randint(1, 100)
636                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % (
637                    self.config.target_test_path,
638                    filename,
639                    filename,
640                    test_para,
641                    seed)
642            else:
643                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % (
644                    self.config.target_test_path,
645                    filename,
646                    filename,
647                    test_para)
648        else:
649            coverage_outpath = self.config.coverage_outpath
650            if coverage_outpath:
651                strip_num = len(coverage_outpath.strip("/").split("/"))
652            else:
653                ohos_config_path = os.path.join(sys.source_code_root_path, "out", "ohos_config.json")
654                with open(ohos_config_path, 'r') as json_file:
655                    json_info = json.load(json_file)
656                    out_path = json_info.get("out_path")
657                strip_num = len(out_path.strip("/").split("/"))
658            if "fuzztest" == self.config.testtype[0]:
659                self._push_corpus_cov_if_exist(suite_file)
660                command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \
661                                        rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX={DEFAULT_TEST_PATH}; \
662                                        GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}"
663            else:
664                command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=%s " \
665                          "GCOV_PREFIX_STRIP=%s ./%s %s" % \
666                          (self.config.target_test_path,
667                           filename,
668                           DEFAULT_TEST_PATH,
669                           str(strip_num),
670                           filename,
671                           test_para)
672
673        if self.config.hidelog:
674            command += " > {}.log 2>&1".format(filename)
675
676        return command
677
678    def _run_gtest(self, suite_file):
679        from xdevice import Variables
680        is_coverage_test = True if self.config.coverage else False
681
682        # push testsuite file
683        self.config.device.push_file(suite_file, self.config.target_test_path)
684        self.config.device.execute_shell_command(
685            "hilog -d %s" % (os.path.join(self.config.target_test_path,
686                                          os.path.basename(suite_file)))
687        )
688        self._push_corpus_if_exist(suite_file)
689
690        # push resource files
691        resource_manager = ResourceManager()
692        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
693        resource_manager.process_preparer_data(resource_data_dic, resource_dir,
694                                               self.config.device)
695
696        command = self._gtest_command(suite_file)
697
698        result = ResultManager(suite_file, self.config)
699        result.set_is_coverage(is_coverage_test)
700
701        try:
702            # get result
703            if self.config.hidelog:
704                return_message = ""
705                display_receiver = CollectingOutputReceiver()
706                self.config.device.execute_shell_command(
707                    command,
708                    receiver=display_receiver,
709                    timeout=TIME_OUT,
710                    retry=0)
711            else:
712                display_receiver = DisplayOutputReceiver()
713                self.config.device.execute_shell_command(
714                    command,
715                    receiver=display_receiver,
716                    timeout=TIME_OUT,
717                    retry=0)
718                return_message = display_receiver.output
719        except (ExecuteTerminate, DeviceError) as exception:
720            return_message = str(exception.args)
721
722        if self.config.hidelog:
723            self.result = result.get_test_results_hidelog(return_message)
724        else:
725            self.result = result.get_test_results(return_message)
726        update_xml(suite_file, self.result)
727
728        resource_manager.process_cleaner_data(resource_data_dic,
729                                              resource_dir,
730                                              self.config.device)
731
732    def _push_corpus_cov_if_exist(self, suite_file):
733        corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep)
734        cov_file = os.path.join(
735            sys.framework_root_dir, "reports", "latest_corpus", corpus_path + "_corpus.tar.gz")
736        LOG.info("corpus_cov file :%s" % str(cov_file))
737        self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path))
738
739    def _push_corpus_if_exist(self, suite_file):
740        if "fuzztest" == self.config.testtype[0]:
741            corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus")
742            if not os.path.isdir(corpus_path):
743                return
744
745            corpus_dirs = []
746            corpus_file_list = []
747
748            for root, _, files in os.walk(corpus_path):
749                if not files:
750                    continue
751
752                corpus_dir = root.split("corpus")[-1]
753                if corpus_dir != "":
754                    corpus_dirs.append(corpus_dir)
755
756                for file in files:
757                    cp_file = os.path.normcase(os.path.join(root, file))
758                    corpus_file_list.append(cp_file)
759                    if file == "init":
760                        self._alter_init(cp_file)
761
762            # mkdir corpus files dir
763            if corpus_dirs:
764                for corpus in corpus_dirs:
765                    mkdir_corpus_command = f"shell; mkdir -p {corpus}"
766                    self.config.device.connector_command(mkdir_corpus_command)
767
768            # push corpus file
769            if corpus_file_list:
770                for corpus_file in corpus_file_list:
771                    self.config.device.push_file(corpus_file,
772                                                 os.path.join(self.config.target_test_path, "corpus"))
773
774    def _get_test_para(self,
775                       testcase,
776                       testlevel,
777                       testtype,
778                       target_test_path,
779                       suite_file,
780                       filename):
781        if "benchmark" == testtype[0]:
782            test_para = (" --benchmark_out_format=json"
783                         " --benchmark_out=%s%s.json") % (
784                            target_test_path, filename)
785            return test_para
786
787        if "" != testcase and "" == testlevel:
788            test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
789        elif "" == testcase and "" != testlevel:
790            level_para = get_level_para_string(testlevel)
791            test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
792        else:
793            test_para = ""
794
795        if "fuzztest" == testtype[0]:
796            cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path(
797                suite_file), "project.xml")).get_fuzzer_config("fuzztest")
798            LOG.info("config list :%s" % str(cfg_list))
799            if self.config.coverage:
800                test_para += "corpus -runs=0" + \
801                             " -max_len=" + cfg_list[0] + \
802                             " -max_total_time=" + cfg_list[1] + \
803                             " -rss_limit_mb=" + cfg_list[2]
804            else:
805                test_para += "corpus -max_len=" + cfg_list[0] + \
806                             " -max_total_time=" + cfg_list[1] + \
807                             " -rss_limit_mb=" + cfg_list[2]
808
809        return test_para
810
811
812@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test)
813class JSUnitTestDriver(IDriver):
814    """
815    JSUnitTestDriver is a Test that runs a native test package on given device.
816    """
817
818    def __init__(self):
819        self.config = None
820        self.result = ""
821        self.start_time = None
822        self.ability_name = ""
823        self.package_name = ""
824        # log
825        self.hilog = None
826        self.hilog_proc = None
827
828    def __check_environment__(self, device_options):
829        pass
830
831    def __check_config__(self, config):
832        pass
833
834    def __result__(self):
835        return self.result if os.path.exists(self.result) else ""
836
837    def __execute__(self, request):
838        try:
839            LOG.info("developer_test driver")
840            self.config = request.config
841            self.config.target_test_path = DEFAULT_TEST_PATH
842            self.config.device = request.config.environment.devices[0]
843
844            suite_file = request.root.source.source_file
845            result_save_path = get_result_savepath(suite_file, self.config.report_path)
846            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
847            if not suite_file:
848                LOG.error("test source '%s' not exists" %
849                          request.root.source.source_string)
850                return
851
852            if not self.config.device:
853                result = ResultManager(suite_file, self.config)
854                result.set_is_coverage(False)
855                result.make_empty_result_file(
856                    "No test device is found")
857                return
858
859            package_name, ability_name = self._get_package_and_ability_name(
860                suite_file)
861            self.package_name = package_name
862            self.ability_name = ability_name
863            self.config.test_hap_out_path = \
864                "/data/data/%s/files/" % self.package_name
865            self.config.device.connector_command("shell hilog -r")
866
867            self.hilog = get_device_log_file(
868                request.config.report_path,
869                request.config.device.__get_serial__() + "_" + request.
870                get_module_name(),
871                "device_hilog")
872
873            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
874                                 0o755)
875
876            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
877                self.config.device.device_log_collector.add_log_address(None, self.hilog)
878                _, self.hilog_proc = self.config.device.device_log_collector.\
879                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
880                self._init_jsunit_test()
881                self._run_jsunit(suite_file, self.hilog)
882                hilog_file_pipe.flush()
883                self.generate_console_output(self.hilog, request)
884                xml_path = os.path.join(
885                    request.config.report_path, "result",
886                    '.'.join((request.get_module_name(), "xml")))
887                shutil.move(xml_path, self.result)
888                update_xml(suite_file, self.result)
889        finally:
890            self.config.device.device_log_collector.remove_log_address(None, self.hilog)
891            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
892
893    @staticmethod
894    def _get_acts_test_para(testcase,
895                            testlevel,
896                            testtype,
897                            target_test_path,
898                            suite_file,
899                            filename):
900        if "actstest" == testtype[0]:
901            test_para = (" --actstest_out_format=json"
902                         " --actstest_out=%s%s.json") % (
903                            target_test_path, filename)
904            return test_para
905
906        if "" != testcase and "" == testlevel:
907            test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase)
908        elif "" == testcase and "" != testlevel:
909            level_para = get_level_para_string(testlevel)
910            test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para)
911        else:
912            test_para = ""
913        return test_para
914
915    @staticmethod
916    def _get_hats_test_para(testcase,
917                            testlevel,
918                            testtype,
919                            target_test_path,
920                            suite_file,
921                            filename):
922        if "hatstest" == testtype[0]:
923            test_hats_para = (" --hatstest_out_format=json"
924                         " --hatstest_out=%s%s.json") % (
925                            target_test_path, filename)
926            return test_hats_para
927
928        if "" != testcase and "" == testlevel:
929            test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
930        elif "" == testcase and "" != testlevel:
931            level_para = get_level_para_string(testlevel)
932            test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
933        else:
934            test_hats_para = ""
935        return test_hats_para
936
937    @classmethod
938    def _get_json_shell_timeout(cls, json_filepath):
939        test_timeout = 0
940        try:
941            with open(json_filepath, 'r') as json_file:
942                data_dic = json.load(json_file)
943                if not data_dic:
944                    return test_timeout
945                else:
946                    if "driver" in data_dic.keys():
947                        driver_dict = data_dic.get("driver")
948                        if driver_dict and "test-timeout" in driver_dict.keys():
949                            test_timeout = int(driver_dict["shell-timeout"]) / 1000
950                    return test_timeout
951        except JSONDecodeError:
952            return test_timeout
953        finally:
954            print(" get json shell timeout finally")
955
956    @staticmethod
957    def _get_package_and_ability_name(hap_filepath):
958        package_name = ""
959        ability_name = ""
960        if os.path.exists(hap_filepath):
961            filename = os.path.basename(hap_filepath)
962
963            # unzip the hap file
964            hap_bak_path = os.path.abspath(os.path.join(
965                os.path.dirname(hap_filepath),
966                "%s.bak" % filename))
967            zf_desc = zipfile.ZipFile(hap_filepath)
968            try:
969                zf_desc.extractall(path=hap_bak_path)
970            except RuntimeError as error:
971                print("Unzip error: ", hap_bak_path)
972            zf_desc.close()
973
974            # verify config.json file
975            app_profile_path = os.path.join(hap_bak_path, "config.json")
976            if not os.path.exists(app_profile_path):
977                print("file %s not exist" % app_profile_path)
978                return package_name, ability_name
979
980            if os.path.isdir(app_profile_path):
981                print("%s is a folder, and not a file" % app_profile_path)
982                return package_name, ability_name
983
984            # get package_name and ability_name value
985            load_dict = {}
986            with open(app_profile_path, 'r') as load_f:
987                load_dict = json.load(load_f)
988            profile_list = load_dict.values()
989            for profile in profile_list:
990                package_name = profile.get("package")
991                if not package_name:
992                    continue
993                abilities = profile.get("abilities")
994                for abilitie in abilities:
995                    abilities_name = abilitie.get("name")
996                    if abilities_name.startswith("."):
997                        ability_name = package_name + abilities_name[
998                                                      abilities_name.find("."):]
999                    else:
1000                        ability_name = abilities_name
1001                    break
1002                break
1003
1004            # delete hap_bak_path
1005            if os.path.exists(hap_bak_path):
1006                shutil.rmtree(hap_bak_path)
1007        else:
1008            print("file %s not exist" % hap_filepath)
1009        return package_name, ability_name
1010
1011    def generate_console_output(self, device_log_file, request):
1012        result_message = self.read_device_log(device_log_file)
1013
1014        report_name = request.get_module_name()
1015        parsers = get_plugin(
1016            Plugin.PARSER, CommonParserType.jsunit)
1017        if parsers:
1018            parsers = parsers[:1]
1019        for listener in request.listeners:
1020            listener.device_sn = self.config.device.device_sn
1021        parser_instances = []
1022
1023        for parser in parsers:
1024            parser_instance = parser.__class__()
1025            parser_instance.suites_name = report_name
1026            parser_instance.suite_name = report_name
1027            parser_instance.listeners = request.listeners
1028            parser_instances.append(parser_instance)
1029        handler = ShellHandler(parser_instances)
1030        process_command_ret(result_message, handler)
1031
1032    def read_device_log(self, device_log_file):
1033        result_message = ""
1034        with open(device_log_file, "r", encoding='utf-8',
1035                  errors='ignore') as file_read_pipe:
1036            while True:
1037                data = file_read_pipe.readline()
1038                if not data:
1039                    break
1040                # only filter JSApp log
1041                if data.lower().find(_ACE_LOG_MARKER) != -1:
1042                    result_message += data
1043                    if data.find("[end] run suites end") != -1:
1044                        break
1045        return result_message
1046
1047    def start_hap_execute(self):
1048        try:
1049            command = "aa start -d 123 -a %s.MainAbility -b %s" \
1050                      % (self.package_name, self.package_name)
1051            self.start_time = time.time()
1052            result_value = self.config.device.execute_shell_command(
1053                command, timeout=TIME_OUT)
1054
1055            if "success" in str(result_value).lower():
1056                LOG.info("execute %s's testcase success. result value=%s"
1057                         % (self.package_name, result_value))
1058            else:
1059                LOG.info("execute %s's testcase failed. result value=%s"
1060                         % (self.package_name, result_value))
1061
1062            _sleep_according_to_result(result_value)
1063            return_message = result_value
1064        except (ExecuteTerminate, DeviceError) as exception:
1065            return_message = exception.args
1066
1067        return return_message
1068
1069    def _init_jsunit_test(self):
1070        self.config.device.connector_command("target mount")
1071        self.config.device.execute_shell_command(
1072            "rm -rf %s" % self.config.target_test_path)
1073        self.config.device.execute_shell_command(
1074            "mkdir -p %s" % self.config.target_test_path)
1075        self.config.device.execute_shell_command(
1076            "mount -o rw,remount,rw /")
1077
1078    def _run_jsunit(self, suite_file, device_log_file):
1079        filename = os.path.basename(suite_file)
1080        _, suffix_name = os.path.splitext(filename)
1081
1082        resource_manager = ResourceManager()
1083        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
1084        if suffix_name == ".hap":
1085            json_file_path = suite_file.replace(".hap", ".json")
1086            if os.path.exists(json_file_path):
1087                timeout = self._get_json_shell_timeout(json_file_path)
1088            else:
1089                timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
1090        else:
1091            timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
1092        resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device)
1093        main_result = self._install_hap(suite_file)
1094        result = ResultManager(suite_file, self.config)
1095        if main_result:
1096            self._execute_hapfile_jsunittest()
1097            try:
1098                status = False
1099                actiontime = JS_TIMEOUT
1100                times = CYCLE_TIMES
1101                if timeout:
1102                    actiontime = timeout
1103                    times = 1
1104                with open(device_log_file, "r", encoding='utf-8',
1105                          errors='ignore') as file_read_pipe:
1106                    for i in range(0, times):
1107                        if status:
1108                            break
1109                        else:
1110                            time.sleep(float(actiontime))
1111                        start_time = int(time.time())
1112                        while True:
1113                            data = file_read_pipe.readline()
1114                            if data.lower().find(_ACE_LOG_MARKER) != -1 and data.find("[end] run suites end") != -1:
1115                                LOG.info("execute testcase successfully.")
1116                                status = True
1117                                break
1118                            if int(time.time()) - start_time > 5:
1119                                break
1120            finally:
1121                _lock_screen(self.config.device)
1122                self._uninstall_hap(self.package_name)
1123        else:
1124            self.result = result.get_test_results("Error: install hap failed")
1125            LOG.error("Error: install hap failed")
1126
1127        resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device)
1128
1129    def _execute_hapfile_jsunittest(self):
1130        _unlock_screen(self.config.device)
1131        _unlock_device(self.config.device)
1132
1133        try:
1134            return_message = self.start_hap_execute()
1135        except (ExecuteTerminate, DeviceError) as exception:
1136            return_message = str(exception.args)
1137
1138        return return_message
1139
1140    def _install_hap(self, suite_file):
1141        message = self.config.device.connector_command("install %s" % suite_file)
1142        message = str(message).rstrip()
1143        if message == "" or "success" in message:
1144            return_code = True
1145            if message != "":
1146                LOG.info(message)
1147        else:
1148            return_code = False
1149            if message != "":
1150                LOG.warning(message)
1151
1152        _sleep_according_to_result(return_code)
1153        return return_code
1154
1155    def _uninstall_hap(self, package_name):
1156        return_message = self.config.device.execute_shell_command(
1157            "bm uninstall -n %s" % package_name)
1158        _sleep_according_to_result(return_message)
1159        return return_message
1160
1161
1162@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
1163class OHRustTestDriver(IDriver):
1164    def __init__(self):
1165        self.result = ""
1166        self.error_message = ""
1167        self.config = None
1168
1169    def __check_environment__(self, device_options):
1170        pass
1171
1172    def __check_config__(self, config):
1173        pass
1174
1175    def __execute__(self, request):
1176        try:
1177            LOG.debug("Start to execute open harmony rust test")
1178            self.config = request.config
1179            self.config.device = request.config.environment.devices[0]
1180            self.config.target_test_path = DEFAULT_TEST_PATH
1181            suite_file = request.root.source.source_file
1182            LOG.debug("Testsuite filepath:{}".format(suite_file))
1183
1184            if not suite_file:
1185                LOG.error("test source '{}' not exists".format(
1186                    request.root.source.source_string))
1187                return
1188
1189            result_save_path = get_result_savepath(suite_file, self.config.report_path)
1190            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
1191            self.config.device.set_device_report_path(request.config.report_path)
1192            self.config.device.device_log_collector.start_hilog_task()
1193            self._init_oh_rust()
1194            self._run_oh_rust(suite_file, request)
1195        except Exception as exception:
1196            self.error_message = exception
1197            if not getattr(exception, "error_no", ""):
1198                setattr(exception, "error_no", "03409")
1199            LOG.exception(self.error_message, exc_info=False, error_no="03409")
1200        finally:
1201            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
1202                                    time.time_ns())
1203            log_tar_file_name = "{}_{}".format(
1204                request.get_module_name(), str(serial).replace(":", "_"))
1205            self.config.device.device_log_collector.stop_hilog_task(
1206                log_tar_file_name, module_name=request.get_module_name())
1207            xml_path = os.path.join(
1208                request.config.report_path, "result",
1209                '.'.join((request.get_module_name(), "xml")))
1210            shutil.move(xml_path, self.result)
1211            self.result = check_result_report(
1212                request.config.report_path, self.result, self.error_message)
1213            update_xml(request.root.source.source_file, self.result)
1214
1215    def __result__(self):
1216        return self.result if os.path.exists(self.result) else ""
1217
1218    def _init_oh_rust(self):
1219        self.config.device.connector_command("target mount")
1220        self.config.device.execute_shell_command(
1221            "rm -rf %s" % self.config.target_test_path)
1222        self.config.device.execute_shell_command(
1223            "mkdir -p %s" % self.config.target_test_path)
1224        self.config.device.execute_shell_command(
1225            "mount -o rw,remount,rw /")
1226
1227    def _run_oh_rust(self, suite_file, request=None):
1228        self.config.device.push_file(suite_file, self.config.target_test_path)
1229        self.config.device.execute_shell_command(
1230            "hilog -d %s" % (os.path.join(self.config.target_test_path,
1231                                          os.path.basename(suite_file)))
1232        )
1233        resource_manager = ResourceManager()
1234        resource_data_dict, resource_dir = \
1235            resource_manager.get_resource_data_dic(suite_file)
1236        resource_manager.process_preparer_data(resource_data_dict,
1237                                               resource_dir,
1238                                               self.config.device)
1239        for listener in request.listeners:
1240            listener.device_sn = self.config.device.device_sn
1241
1242        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust)
1243        if parsers:
1244            parsers = parsers[:1]
1245        parser_instances = []
1246        for parser in parsers:
1247            parser_instance = parser.__class__()
1248            parser_instance.suite_name = request.get_module_name()
1249            parser_instance.listeners = request.listeners
1250            parser_instances.append(parser_instance)
1251        handler = ShellHandler(parser_instances)
1252        if self.config.coverage:
1253            command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format(
1254                self.config.target_test_path, os.path.basename(suite_file))
1255        else:
1256            command = "cd {}; chmod +x *; ./{}".format(
1257                self.config.target_test_path, os.path.basename(suite_file))
1258        self.config.device.execute_shell_command(
1259            command, timeout=TIME_OUT, receiver=handler, retry=0)
1260        if self.config.coverage:
1261            result = ResultManager(suite_file, self.config)
1262            result.obtain_coverage_data()
1263        resource_manager.process_cleaner_data(resource_data_dict, resource_dir,
1264                                              self.config.device)
1265