• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21import re
22import shutil
23import subprocess
24import sys
25import time
26import platform
27import zipfile
28import stat
29import random
30from dataclasses import dataclass
31from json import JSONDecodeError
32
33from xdevice import DeviceTestType, check_result_report
34from xdevice import DeviceLabelType
35from xdevice import CommonParserType
36from xdevice import ExecuteTerminate
37from xdevice import DeviceError
38from xdevice import ShellHandler
39
40from xdevice import IDriver
41from xdevice import platform_logger
42from xdevice import Plugin
43from xdevice import get_plugin
44from ohos.environment.dmlib import process_command_ret
45from core.utils import get_decode
46from core.utils import get_fuzzer_path
47from core.config.resource_manager import ResourceManager
48from core.config.config_manager import FuzzerConfigManager
49
50__all__ = [
51    "CppTestDriver",
52    "JSUnitTestDriver",
53    "disable_keyguard",
54    "GTestConst"]
55
56LOG = platform_logger("Drivers")
57DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
58OBJ = "obj"
59
60TIME_OUT = 900 * 1000
61JS_TIMEOUT = 10
62CYCLE_TIMES = 30
63
64FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL
65MODES = stat.S_IWUSR | stat.S_IRUSR
66
67
68##############################################################################
69##############################################################################
70
71class CollectingOutputReceiver:
72    def __init__(self):
73        self.output = ""
74
75    def __read__(self, output):
76        self.output = "%s%s" % (self.output, output)
77
78    def __error__(self, message):
79        pass
80
81    def __done__(self, result_code="", message=""):
82        pass
83
84
85class DisplayOutputReceiver:
86    def __init__(self):
87        self.output = ""
88        self.unfinished_line = ""
89
90    def _process_output(self, output, end_mark="\n"):
91        content = output
92        if self.unfinished_line:
93            content = "".join((self.unfinished_line, content))
94            self.unfinished_line = ""
95        lines = content.split(end_mark)
96        if content.endswith(end_mark):
97            return lines[:-1]
98        else:
99            self.unfinished_line = lines[-1]
100            return lines[:-1]
101
102    def __read__(self, output):
103        self.output = "%s%s" % (self.output, output)
104        lines = self._process_output(output)
105        for line in lines:
106            line = line.strip()
107            if line:
108                LOG.info(get_decode(line))
109
110    def __error__(self, message):
111        pass
112
113    def __done__(self, result_code="", message=""):
114        pass
115
116
117@dataclass
118class GTestConst(object):
119    exec_para_filter = "--gtest_filter"
120    exec_para_level = "--gtest_testsize"
121    exec_acts_para_filter = "--jstest_filter"
122    exec_acts_para_level = "--jstest_testsize"
123
124
125def get_device_log_file(report_path, serial=None, log_name="device_log"):
126    from xdevice import Variables
127    log_path = os.path.join(report_path, Variables.report_vars.log_dir)
128    os.makedirs(log_path, exist_ok=True)
129
130    serial = serial or time.time_ns()
131    device_file_name = "{}_{}.log".format(log_name, serial)
132    device_log_file = os.path.join(log_path, device_file_name)
133    return device_log_file
134
135
136def get_level_para_string(level_string):
137    level_list = list(set(level_string.split(",")))
138    level_para_string = ""
139    for item in level_list:
140        if not item.isdigit():
141            continue
142        item = item.strip(" ")
143        level_para_string = f"{level_para_string}Level{item},"
144    level_para_string = level_para_string.strip(",")
145    return level_para_string
146
147
148def get_result_savepath(testsuit_path, result_rootpath):
149    findkey = os.sep + "tests" + os.sep
150    filedir, _ = os.path.split(testsuit_path)
151    pos = filedir.find(findkey)
152    if -1 != pos:
153        subpath = filedir[pos + len(findkey):]
154        pos1 = subpath.find(os.sep)
155        if -1 != pos1:
156            subpath = subpath[pos1 + len(os.sep):]
157            result_path = os.path.join(result_rootpath, "result", subpath)
158        else:
159            result_path = os.path.join(result_rootpath, "result")
160    else:
161        result_path = os.path.join(result_rootpath, "result")
162
163    if not os.path.exists(result_path):
164        os.makedirs(result_path)
165
166    LOG.info("result_savepath = " + result_path)
167    return result_path
168
169
170def get_test_log_savepath(result_rootpath):
171    test_log_path = os.path.join(result_rootpath, "log", "test_log")
172
173    if not os.path.exists(test_log_path):
174        os.makedirs(test_log_path)
175
176    LOG.info("test_log_savepath = {}".format(test_log_path))
177    return test_log_path
178
179
180# all testsuit common Unavailable test result xml
181def _create_empty_result_file(filepath, filename, error_message):
182    error_message = str(error_message)
183    error_message = error_message.replace("\"", "")
184    error_message = error_message.replace("<", "")
185    error_message = error_message.replace(">", "")
186    error_message = error_message.replace("&", "")
187    if filename.endswith(".hap"):
188        filename = filename.split(".")[0]
189    if not os.path.exists(filepath):
190        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
191            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
192                                       time.localtime())
193            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
194            file_desc.write(
195                '<testsuites tests="0" failures="0" '
196                'disabled="0" errors="0" timestamp="%s" '
197                'time="0" name="AllTests">\n' % time_stamp)
198            file_desc.write(
199                '  <testsuite name="%s" tests="0" failures="0" '
200                'disabled="0" errors="0" time="0.0" '
201                'unavailable="1" message="%s">\n' %
202                (filename, error_message))
203            file_desc.write('  </testsuite>\n')
204            file_desc.write('</testsuites>\n')
205    return
206
207
208def _unlock_screen(device):
209    device.execute_shell_command("svc power stayon true")
210    time.sleep(1)
211
212
213def _unlock_device(device):
214    device.execute_shell_command("input keyevent 82")
215    time.sleep(1)
216    device.execute_shell_command("wm dismiss-keyguard")
217    time.sleep(1)
218
219
220def _lock_screen(device):
221    device.execute_shell_command("svc power stayon false")
222    time.sleep(1)
223
224
225def disable_keyguard(device):
226    _unlock_screen(device)
227    _unlock_device(device)
228
229
230def _sleep_according_to_result(result):
231    if result:
232        time.sleep(1)
233
234
235def _create_fuzz_crash_file(filepath, filename):
236    if not os.path.exists(filepath):
237        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
238            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
239                                       time.localtime())
240            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
241            file_desc.write(
242                '<testsuites disabled="0" name="AllTests" '
243                'time="300" timestamp="%s" errors="0" '
244                'failures="1" tests="1">\n' % time_stamp)
245            file_desc.write(
246                '  <testsuite disabled="0" name="%s" time="300" '
247                'errors="0" failures="1" tests="1">\n' % filename)
248            file_desc.write(
249                '    <testcase name="%s" time="300" classname="%s" '
250                'status="run">\n' % (filename, filename))
251            file_desc.write(
252                '      <failure type="" '
253                'message="Fuzzer crash. See ERROR in log file">\n')
254            file_desc.write('      </failure>\n')
255            file_desc.write('    </testcase>\n')
256            file_desc.write('  </testsuite>\n')
257            file_desc.write('</testsuites>\n')
258    return
259
260
261def _create_fuzz_pass_file(filepath, filename):
262    if not os.path.exists(filepath):
263        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
264            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
265                                       time.localtime())
266            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
267            file_desc.write(
268                '<testsuites disabled="0" name="AllTests" '
269                'time="300" timestamp="%s" errors="0" '
270                'failures="0" tests="1">\n' % time_stamp)
271            file_desc.write(
272                '  <testsuite disabled="0" name="%s" time="300" '
273                'errors="0" failures="0" tests="1">\n' % filename)
274            file_desc.write(
275                '    <testcase name="%s" time="300" classname="%s" '
276                'status="run"/>\n' % (filename, filename))
277            file_desc.write('  </testsuite>\n')
278            file_desc.write('</testsuites>\n')
279    return
280
281
282def _create_fuzz_result_file(filepath, filename, error_message):
283    error_message = str(error_message)
284    error_message = error_message.replace("\"", "")
285    error_message = error_message.replace("<", "")
286    error_message = error_message.replace(">", "")
287    error_message = error_message.replace("&", "")
288    if "AddressSanitizer" in error_message:
289        LOG.error("FUZZ TEST CRASH")
290        _create_fuzz_crash_file(filepath, filename)
291    elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second',
292                   error_message, re.M) is not None:
293        LOG.info("FUZZ TEST PASS")
294        _create_fuzz_pass_file(filepath, filename)
295    else:
296        LOG.error("FUZZ TEST UNAVAILABLE")
297        _create_empty_result_file(filepath, filename, error_message)
298    return
299
300
301##############################################################################
302##############################################################################
303
304class ResultManager(object):
305    def __init__(self, testsuit_path, config):
306        self.testsuite_path = testsuit_path
307        self.config = config
308        self.result_rootpath = self.config.report_path
309        self.device = self.config.device
310        if testsuit_path.endswith(".hap"):
311            self.device_testpath = self.config.test_hap_out_path
312        else:
313            self.device_testpath = self.config.target_test_path
314        self.testsuite_name = os.path.basename(self.testsuite_path)
315        self.is_coverage = False
316
317    def set_is_coverage(self, is_coverage):
318        self.is_coverage = is_coverage
319
320    def get_test_results(self, error_message=""):
321        # Get test result files
322        filepath, _ = self.obtain_test_result_file()
323        if "fuzztest" == self.config.testtype[0]:
324            LOG.info("create fuzz test report")
325            _create_fuzz_result_file(filepath, self.testsuite_name,
326                                     error_message)
327            if not self.is_coverage:
328                self._obtain_fuzz_corpus()
329
330        if not os.path.exists(filepath):
331            _create_empty_result_file(filepath, self.testsuite_name,
332                                      error_message)
333        if "benchmark" == self.config.testtype[0]:
334            self._obtain_benchmark_result()
335        # Get coverage data files
336        if self.is_coverage:
337            self.obtain_coverage_data()
338
339        return filepath
340
341    def get_test_results_hidelog(self, error_message=""):
342        # Get test result files
343        result_file_path, test_log_path = self.obtain_test_result_file()
344        log_content = ""
345        if not error_message:
346            if os.path.exists(test_log_path):
347                with open(test_log_path, "r") as log:
348                    log_content = log.readlines()
349            else:
350                LOG.error("{}: Test log not exist.".format(test_log_path))
351        else:
352            log_content = error_message
353
354        if "fuzztest" == self.config.testtype[0]:
355            LOG.info("create fuzz test report")
356            _create_fuzz_result_file(result_file_path, self.testsuite_name,
357                                     log_content)
358            if not self.is_coverage:
359                self._obtain_fuzz_corpus()
360
361        if not os.path.exists(result_file_path):
362            _create_empty_result_file(result_file_path, self.testsuite_name,
363                                      log_content)
364        if "benchmark" == self.config.testtype[0]:
365            self._obtain_benchmark_result()
366        # Get coverage data files
367        if self.is_coverage:
368            self.obtain_coverage_data()
369
370        return result_file_path
371
372    def _obtain_fuzz_corpus(self):
373        command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;"
374        self.config.device.execute_shell_command(command)
375        result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath)
376        LOG.info(f"fuzz_dir = {result_save_path}")
377        self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path)
378
379    def _obtain_benchmark_result(self):
380        benchmark_root_dir = os.path.abspath(
381            os.path.join(self.result_rootpath, "benchmark"))
382        benchmark_dir = os.path.abspath(
383            os.path.join(benchmark_root_dir,
384                         self.get_result_sub_save_path(),
385                         self.testsuite_name))
386
387        if not os.path.exists(benchmark_dir):
388            os.makedirs(benchmark_dir)
389
390        LOG.info("benchmark_dir = %s" % benchmark_dir)
391        self.device.pull_file(os.path.join(self.device_testpath,
392                                           "%s.json" % self.testsuite_name), benchmark_dir)
393        if not os.path.exists(os.path.join(benchmark_dir,
394                                           "%s.json" % self.testsuite_name)):
395            os.rmdir(benchmark_dir)
396        return benchmark_dir
397
398    def get_result_sub_save_path(self):
399        find_key = os.sep + "benchmark" + os.sep
400        file_dir, _ = os.path.split(self.testsuite_path)
401        pos = file_dir.find(find_key)
402        subpath = ""
403        if -1 != pos:
404            subpath = file_dir[pos + len(find_key):]
405        LOG.info("subpath = " + subpath)
406        return subpath
407
408    def obtain_test_result_file(self):
409        result_save_path = get_result_savepath(self.testsuite_path,
410                                               self.result_rootpath)
411
412        result_file_path = os.path.join(result_save_path,
413                                        "%s.xml" % self.testsuite_name)
414
415        result_josn_file_path = os.path.join(result_save_path,
416                                             "%s.json" % self.testsuite_name)
417
418        if self.testsuite_path.endswith('.hap'):
419            remote_result_file = os.path.join(self.device_testpath,
420                                              "testcase_result.xml")
421            remote_json_result_file = os.path.join(self.device_testpath,
422                                                   "%s.json" % self.testsuite_name)
423        else:
424            remote_result_file = os.path.join(self.device_testpath,
425                                              "%s.xml" % self.testsuite_name)
426            remote_json_result_file = os.path.join(self.device_testpath,
427                                                   "%s.json" % self.testsuite_name)
428
429        if self.config.testtype[0] != "fuzztest":
430            if self.device.is_file_exist(remote_result_file):
431                self.device.pull_file(remote_result_file, result_file_path)
432            elif self.device.is_file_exist(remote_json_result_file):
433                self.device.pull_file(remote_json_result_file,
434                                      result_josn_file_path)
435                result_file_path = result_josn_file_path
436            else:
437                LOG.info("%s not exist", remote_result_file)
438
439        if self.config.hidelog:
440            remote_log_result_file = os.path.join(self.device_testpath,
441                                                  "%s.log" % self.testsuite_name)
442            test_log_save_path = get_test_log_savepath(self.result_rootpath)
443            test_log_file_path = os.path.join(test_log_save_path,
444                                              "%s.log" % self.testsuite_name)
445            self.device.pull_file(remote_log_result_file, test_log_file_path)
446            return result_file_path, test_log_file_path
447
448        return result_file_path, ""
449
450    def make_empty_result_file(self, error_message=""):
451        result_savepath = get_result_savepath(self.testsuite_path,
452                                              self.result_rootpath)
453        result_filepath = os.path.join(result_savepath, "%s.xml" %
454                                       self.testsuite_name)
455        if not os.path.exists(result_filepath):
456            _create_empty_result_file(result_filepath,
457                                      self.testsuite_name, error_message)
458
459    def is_exist_target_in_device(self, path, target):
460        if platform.system() == "Windows":
461            command = '\"ls -l %s | grep %s\"' % (path, target)
462        else:
463            command = "ls -l %s | grep %s" % (path, target)
464
465        check_result = False
466        stdout_info = self.device.execute_shell_command(command)
467        if stdout_info != "" and stdout_info.find(target) != -1:
468            check_result = True
469        return check_result
470
471    def obtain_coverage_data(self):
472        cov_root_dir = os.path.abspath(os.path.join(
473            self.result_rootpath,
474            "..",
475            "coverage",
476            "data",
477            "exec"))
478
479
480        tests_path = self.config.testcases_path
481        test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0]
482        cxx_cov_path = os.path.abspath(os.path.join(
483            self.result_rootpath,
484            "..",
485            "coverage",
486            "data",
487            "cxx",
488            self.testsuite_name + '_' + test_type))
489
490        if os.path.basename(self.testsuite_name).startswith("rust_"):
491            target_name = "lib.unstripped"
492        else:
493            target_name = OBJ
494        if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name):
495            if not os.path.exists(cxx_cov_path):
496                os.makedirs(cxx_cov_path)
497            self.config.device.execute_shell_command(
498                "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name))
499            src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name)
500            self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT)
501            tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name)
502            if platform.system() == "Windows":
503                process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True)
504                process.communicate()
505                os.remove(tar_path)
506                os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ))
507            else:
508                subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" %
509                                 (tar_path, cxx_cov_path), shell=True).communicate()
510                subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate()
511                if target_name != OBJ:
512                    subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name),
513                                                   os.path.join(cxx_cov_path, OBJ)), shell=True).communicate()
514
515
516##############################################################################
517##############################################################################
518
519@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test)
520class CppTestDriver(IDriver):
521    """
522    CppTest is a Test that runs a native test package on given device.
523    """
524    # test driver config
525    config = None
526    result = ""
527
528    def __check_environment__(self, device_options):
529        if len(device_options) == 1 and device_options[0].label is None:
530            return True
531        if len(device_options) != 1 or \
532                device_options[0].label != DeviceLabelType.phone:
533            return False
534        return True
535
536    def __check_config__(self, config):
537        pass
538
539    def __result__(self):
540        return self.result if os.path.exists(self.result) else ""
541
542    def __execute__(self, request):
543        try:
544            self.config = request.config
545            self.config.target_test_path = DEFAULT_TEST_PATH
546            self.config.device = request.config.environment.devices[0]
547
548            suite_file = request.root.source.source_file
549            LOG.debug("Testsuite FilePath: %s" % suite_file)
550
551            if not suite_file:
552                LOG.error("test source '%s' not exists" %
553                          request.root.source.source_string)
554                return
555
556            if not self.config.device:
557                result = ResultManager(suite_file, self.config)
558                result.set_is_coverage(False)
559                result.make_empty_result_file(
560                    "No test device is found. ")
561                return
562            self.config.device.set_device_report_path(request.config.report_path)
563            self.config.device.device_log_collector.start_hilog_task()
564            self._init_gtest()
565            self._run_gtest(suite_file)
566
567        finally:
568            serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns())
569            log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace(
570                ":", "_"))
571            self.config.device.device_log_collector.stop_hilog_task(log_tar_file_name)
572
573    def _init_gtest(self):
574        self.config.device.connector_command("target mount")
575        self.config.device.execute_shell_command(
576            "rm -rf %s" % self.config.target_test_path)
577        self.config.device.execute_shell_command(
578            "mkdir -p %s" % self.config.target_test_path)
579        self.config.device.execute_shell_command(
580            "mount -o rw,remount,rw /")
581        if "fuzztest" == self.config.testtype[0]:
582            self.config.device.execute_shell_command(
583                "mkdir -p %s" % os.path.join(self.config.target_test_path,
584                                             "corpus"))
585
586    def _gtest_command(self, suite_file):
587        filename = os.path.basename(suite_file)
588        test_para = self._get_test_para(self.config.testcase,
589                                        self.config.testlevel,
590                                        self.config.testtype,
591                                        self.config.target_test_path,
592                                        suite_file,
593                                        filename)
594
595        # execute testcase
596        if not self.config.coverage:
597            if self.config.random == "random":
598                seed = random.randint(1, 100)
599                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % (
600                    self.config.target_test_path,
601                    filename,
602                    filename,
603                    test_para,
604                    seed)
605            else:
606                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % (
607                    self.config.target_test_path,
608                    filename,
609                    filename,
610                    test_para)
611        else:
612            coverage_outpath = self.config.coverage_outpath
613            if coverage_outpath:
614                strip_num = len(coverage_outpath.strip("/").split("/"))
615            else:
616                ohos_config_path = os.path.join(sys.source_code_root_path, "ohos_config.json")
617                with open(ohos_config_path, 'r') as json_file:
618                    json_info = json.load(json_file)
619                    out_path = json_info.get("out_path")
620                strip_num = len(out_path.strip("/").split("/"))
621            if "fuzztest" == self.config.testtype[0]:
622                self._push_corpus_cov_if_exist(suite_file)
623                command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \
624                                        rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX=.; \
625                                        GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}"
626            else:
627                command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \
628                          "GCOV_PREFIX_STRIP=%s ./%s %s" % \
629                          (self.config.target_test_path,
630                           filename,
631                           str(strip_num),
632                           filename,
633                           test_para)
634
635        if self.config.hidelog:
636            command += " > {}.log 2>&1".format(filename)
637
638        return command
639
640    def _run_gtest(self, suite_file):
641        from xdevice import Variables
642        is_coverage_test = True if self.config.coverage else False
643
644        # push testsuite file
645        self.config.device.push_file(suite_file, self.config.target_test_path)
646        self._push_corpus_if_exist(suite_file)
647
648        # push resource files
649        resource_manager = ResourceManager()
650        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
651        resource_manager.process_preparer_data(resource_data_dic, resource_dir,
652                                               self.config.device)
653
654        command = self._gtest_command(suite_file)
655
656        result = ResultManager(suite_file, self.config)
657        result.set_is_coverage(is_coverage_test)
658
659        try:
660            # get result
661            if self.config.hidelog:
662                return_message = ""
663                display_receiver = CollectingOutputReceiver()
664                self.config.device.execute_shell_command(
665                    command,
666                    receiver=display_receiver,
667                    timeout=TIME_OUT,
668                    retry=0)
669            else:
670                display_receiver = DisplayOutputReceiver()
671                self.config.device.execute_shell_command(
672                    command,
673                    receiver=display_receiver,
674                    timeout=TIME_OUT,
675                    retry=0)
676                return_message = display_receiver.output
677        except (ExecuteTerminate, DeviceError) as exception:
678            return_message = str(exception.args)
679
680        if self.config.hidelog:
681            self.result = result.get_test_results_hidelog(return_message)
682        else:
683            self.result = result.get_test_results(return_message)
684
685        resource_manager.process_cleaner_data(resource_data_dic,
686                                              resource_dir,
687                                              self.config.device)
688
689    @staticmethod
690    def _alter_init(name):
691        with open(name, "rb") as f:
692            lines = f.read()
693        str_content = lines.decode("utf-8")
694
695        pattern_sharp = '^\s*#.*$(\n|\r\n)'
696        pattern_star = '/\*.*?\*/(\n|\r\n)+'
697        pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+'
698
699        if re.match(pattern_sharp, str_content, flags=re.M):
700            striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M)
701        elif re.findall(pattern_star, str_content, flags=re.S):
702            striped_content = re.sub(pattern_star, '', str_content, flags=re.S)
703        elif re.findall(pattern_xml, str_content, flags=re.S):
704            striped_content = re.sub(pattern_xml, '', str_content, flags=re.S)
705        else:
706            striped_content = str_content
707
708        striped_bt = striped_content.encode("utf-8")
709        if os.path.exists(name):
710            os.remove(name)
711        with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f:
712            f.write(striped_bt)
713
714    def _push_corpus_cov_if_exist(self, suite_file):
715        cov_file = suite_file + "_corpus.tar.gz"
716        LOG.info("corpus_cov file :%s" % str(cov_file))
717        self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path))
718
719    def _push_corpus_if_exist(self, suite_file):
720        if "fuzztest" == self.config.testtype[0]:
721            corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus")
722            if not os.path.isdir(corpus_path):
723                return
724
725            corpus_dirs = []
726            corpus_file_list = []
727
728            for root, _, files in os.walk(corpus_path):
729                if not files:
730                    continue
731
732                corpus_dir = root.split("corpus")[-1]
733                if corpus_dir != "":
734                    corpus_dirs.append(corpus_dir)
735
736                for file in files:
737                    cp_file = os.path.normcase(os.path.join(root, file))
738                    corpus_file_list.append(cp_file)
739                    if file == "init":
740                        self._alter_init(cp_file)
741
742            # mkdir corpus files dir
743            if corpus_dirs:
744                for corpus in corpus_dirs:
745                    mkdir_corpus_command = f"shell; mkdir -p {corpus}"
746                    self.config.device.connector_command(mkdir_corpus_command)
747
748            # push corpus file
749            if corpus_file_list:
750                for corpus_file in corpus_file_list:
751                    self.config.device.push_file(corpus_file,
752                                                 os.path.join(self.config.target_test_path, "corpus"))
753
754    def _get_test_para(self,
755                       testcase,
756                       testlevel,
757                       testtype,
758                       target_test_path,
759                       suite_file,
760                       filename):
761        if "benchmark" == testtype[0]:
762            test_para = (" --benchmark_out_format=json"
763                         " --benchmark_out=%s%s.json") % (
764                            target_test_path, filename)
765            return test_para
766
767        if "" != testcase and "" == testlevel:
768            test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
769        elif "" == testcase and "" != testlevel:
770            level_para = get_level_para_string(testlevel)
771            test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
772        else:
773            test_para = ""
774
775        if "fuzztest" == testtype[0]:
776            cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path(
777                suite_file), "project.xml")).get_fuzzer_config("fuzztest")
778            LOG.info("config list :%s" % str(cfg_list))
779            if self.config.coverage:
780                test_para += "corpus -runs=0" + \
781                             " -max_len=" + cfg_list[0] + \
782                             " -max_total_time=" + cfg_list[1] + \
783                             " -rss_limit_mb=" + cfg_list[2]
784            else:
785                test_para += "corpus -max_len=" + cfg_list[0] + \
786                             " -max_total_time=" + cfg_list[1] + \
787                             " -rss_limit_mb=" + cfg_list[2]
788
789        return test_para
790
791
792##############################################################################
793##############################################################################
794
795@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test)
796class JSUnitTestDriver(IDriver):
797    """
798    JSUnitTestDriver is a Test that runs a native test package on given device.
799    """
800
801    def __init__(self):
802        self.config = None
803        self.result = ""
804        self.start_time = None
805        self.ability_name = ""
806        self.package_name = ""
807        # log
808        self.hilog = None
809        self.hilog_proc = None
810
811    def __check_environment__(self, device_options):
812        pass
813
814    def __check_config__(self, config):
815        pass
816
817    def __result__(self):
818        return self.result if os.path.exists(self.result) else ""
819
820    def __execute__(self, request):
821        try:
822            LOG.info("developertest driver")
823            self.config = request.config
824            self.config.target_test_path = DEFAULT_TEST_PATH
825            self.config.device = request.config.environment.devices[0]
826
827            suite_file = request.root.source.source_file
828            result_save_path = get_result_savepath(suite_file, self.config.report_path)
829            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
830            if not suite_file:
831                LOG.error("test source '%s' not exists" %
832                          request.root.source.source_string)
833                return
834
835            if not self.config.device:
836                result = ResultManager(suite_file, self.config)
837                result.set_is_coverage(False)
838                result.make_empty_result_file(
839                    "No test device is found")
840                return
841
842            package_name, ability_name = self._get_package_and_ability_name(
843                suite_file)
844            self.package_name = package_name
845            self.ability_name = ability_name
846            self.config.test_hap_out_path = \
847                "/data/data/%s/files/" % self.package_name
848            self.config.device.connector_command("shell hilog -r")
849
850            self.hilog = get_device_log_file(
851                request.config.report_path,
852                request.config.device.__get_serial__() + "_" + request.
853                get_module_name(),
854                "device_hilog")
855
856            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
857                                 0o755)
858
859            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
860                self.config.device.device_log_collector.add_log_address(None, self.hilog)
861                _, self.hilog_proc = self.config.device.device_log_collector.\
862                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
863                self._init_jsunit_test()
864                self._run_jsunit(suite_file, self.hilog)
865                hilog_file_pipe.flush()
866                self.generate_console_output(self.hilog, request)
867                xml_path = os.path.join(
868                    request.config.report_path, "result",
869                    '.'.join((request.get_module_name(), "xml")))
870                shutil.move(xml_path, self.result)
871        finally:
872            self.config.device.device_log_collector.remove_log_address(None, self.hilog)
873            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
874
875    def _init_jsunit_test(self):
876        self.config.device.connector_command("target mount")
877        self.config.device.execute_shell_command(
878            "rm -rf %s" % self.config.target_test_path)
879        self.config.device.execute_shell_command(
880            "mkdir -p %s" % self.config.target_test_path)
881        self.config.device.execute_shell_command(
882            "mount -o rw,remount,rw /")
883
884    def _run_jsunit(self, suite_file, device_log_file):
885        filename = os.path.basename(suite_file)
886        _, suffix_name = os.path.splitext(filename)
887
888        resource_manager = ResourceManager()
889        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
890        if suffix_name == ".hap":
891            json_file_path = suite_file.replace(".hap", ".json")
892            if os.path.exists(json_file_path):
893                timeout = self._get_json_shell_timeout(json_file_path)
894            else:
895                timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
896        else:
897            timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
898        resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device)
899        main_result = self._install_hap(suite_file)
900        result = ResultManager(suite_file, self.config)
901        if main_result:
902            self._execute_hapfile_jsunittest()
903            try:
904                status = False
905                actiontime = JS_TIMEOUT
906                times = CYCLE_TIMES
907                if timeout:
908                    actiontime = timeout
909                    times = 1
910                device_log_file_open = os.open(device_log_file, os.O_RDONLY, stat.S_IWUSR | stat.S_IRUSR)
911                with os.fdopen(device_log_file_open, "r", encoding='utf-8') \
912                        as file_read_pipe:
913                    for i in range(0, times):
914                        if status:
915                            break
916                        else:
917                            time.sleep(float(actiontime))
918                        start_time = int(time.time())
919                        while True:
920                            data = file_read_pipe.readline()
921                            if data.find("JSApp:") != -1 and data.find("[end] run suites end") != -1:
922                                LOG.info("execute testcase successfully.")
923                                status = True
924                                break
925                            if int(time.time()) - start_time > 5:
926                                break
927            finally:
928                _lock_screen(self.config.device)
929                self._uninstall_hap(self.package_name)
930        else:
931            self.result = result.get_test_results("Error: install hap failed")
932            LOG.error("Error: install hap failed")
933
934        resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device)
935
936    def generate_console_output(self, device_log_file, request):
937        result_message = self.read_device_log(device_log_file)
938
939        report_name = request.get_module_name()
940        parsers = get_plugin(
941            Plugin.PARSER, CommonParserType.jsunit)
942        if parsers:
943            parsers = parsers[:1]
944        for listener in request.listeners:
945            listener.device_sn = self.config.device.device_sn
946        parser_instances = []
947
948        for parser in parsers:
949            parser_instance = parser.__class__()
950            parser_instance.suites_name = report_name
951            parser_instance.suite_name = report_name
952            parser_instance.listeners = request.listeners
953            parser_instances.append(parser_instance)
954        handler = ShellHandler(parser_instances)
955        process_command_ret(result_message, handler)
956
957    def read_device_log(self, device_log_file):
958        device_log_file_open = os.open(device_log_file, os.O_RDONLY,
959                                       stat.S_IWUSR | stat.S_IRUSR)
960
961        result_message = ""
962        with os.fdopen(device_log_file_open, "r", encoding='utf-8') \
963                as file_read_pipe:
964            while True:
965                data = file_read_pipe.readline()
966                if not data:
967                    break
968                # only filter JSApp log
969                if data.find("JSApp:") != -1:
970                    result_message += data
971                    if data.find("[end] run suites end") != -1:
972                        break
973        return result_message
974
975    def _execute_hapfile_jsunittest(self):
976        _unlock_screen(self.config.device)
977        _unlock_device(self.config.device)
978
979        try:
980            return_message = self.start_hap_execute()
981        except (ExecuteTerminate, DeviceError) as exception:
982            return_message = str(exception.args)
983
984        return return_message
985
986    def _install_hap(self, suite_file):
987        message = self.config.device.connector_command("install %s" % suite_file)
988        message = str(message).rstrip()
989        if message == "" or "success" in message:
990            return_code = True
991            if message != "":
992                LOG.info(message)
993        else:
994            return_code = False
995            if message != "":
996                LOG.warning(message)
997
998        _sleep_according_to_result(return_code)
999        return return_code
1000
1001    def start_hap_execute(self):
1002        try:
1003            command = "aa start -d 123 -a %s.MainAbility -b %s" \
1004                      % (self.package_name, self.package_name)
1005            self.start_time = time.time()
1006            result_value = self.config.device.execute_shell_command(
1007                command, timeout=TIME_OUT)
1008
1009            if "success" in str(result_value).lower():
1010                LOG.info("execute %s's testcase success. result value=%s"
1011                         % (self.package_name, result_value))
1012            else:
1013                LOG.info("execute %s's testcase failed. result value=%s"
1014                         % (self.package_name, result_value))
1015
1016            _sleep_according_to_result(result_value)
1017            return_message = result_value
1018        except (ExecuteTerminate, DeviceError) as exception:
1019            return_message = exception.args
1020
1021        return return_message
1022
1023    def _uninstall_hap(self, package_name):
1024        return_message = self.config.device.execute_shell_command(
1025            "bm uninstall -n %s" % package_name)
1026        _sleep_according_to_result(return_message)
1027        return return_message
1028
1029    @staticmethod
1030    def _get_acts_test_para(testcase,
1031                            testlevel,
1032                            testtype,
1033                            target_test_path,
1034                            suite_file,
1035                            filename):
1036        if "actstest" == testtype[0]:
1037            test_para = (" --actstest_out_format=json"
1038                         " --actstest_out=%s%s.json") % (
1039                            target_test_path, filename)
1040            return test_para
1041
1042        if "" != testcase and "" == testlevel:
1043            test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase)
1044        elif "" == testcase and "" != testlevel:
1045            level_para = get_level_para_string(testlevel)
1046            test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para)
1047        else:
1048            test_para = ""
1049        return test_para
1050
1051    @staticmethod
1052    def _get_hats_test_para(testcase,
1053                            testlevel,
1054                            testtype,
1055                            target_test_path,
1056                            suite_file,
1057                            filename):
1058        if "hatstest" == testtype[0]:
1059            test_hats_para = (" --hatstest_out_format=json"
1060                         " --hatstest_out=%s%s.json") % (
1061                            target_test_path, filename)
1062            return test_hats_para
1063
1064        if "" != testcase and "" == testlevel:
1065            test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
1066        elif "" == testcase and "" != testlevel:
1067            level_para = get_level_para_string(testlevel)
1068            test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
1069        else:
1070            test_hats_para = ""
1071        return test_hats_para
1072
1073    @classmethod
1074    def _get_json_shell_timeout(cls, json_filepath):
1075        test_timeout = 300
1076        try:
1077            with open(json_filepath, 'r') as json_file:
1078                data_dic = json.load(json_file)
1079                if not data_dic:
1080                    return test_timeout
1081                else:
1082                    if "driver" in data_dic.keys():
1083                        driver_dict = data_dic.get("driver")
1084                        if driver_dict and "test-timeout" in driver_dict.keys():
1085                            test_timeout = int(driver_dict["shell-timeout"]) / 1000
1086                    return test_timeout
1087        except JSONDecodeError:
1088            return test_timeout
1089        finally:
1090            print(" get json shell timeout finally")
1091
1092    @staticmethod
1093    def _get_package_and_ability_name(hap_filepath):
1094        package_name = ""
1095        ability_name = ""
1096        if os.path.exists(hap_filepath):
1097            filename = os.path.basename(hap_filepath)
1098
1099            # unzip the hap file
1100            hap_bak_path = os.path.abspath(os.path.join(
1101                os.path.dirname(hap_filepath),
1102                "%s.bak" % filename))
1103            zf_desc = zipfile.ZipFile(hap_filepath)
1104            try:
1105                zf_desc.extractall(path=hap_bak_path)
1106            except RuntimeError as error:
1107                print("Unzip error:", hap_bak_path)
1108            zf_desc.close()
1109
1110            # verify config.json file
1111            app_profile_path = os.path.join(hap_bak_path, "config.json")
1112            if not os.path.exists(app_profile_path):
1113                print("file %s not exist" % app_profile_path)
1114                return package_name, ability_name
1115
1116            if os.path.isdir(app_profile_path):
1117                print("%s is a folder, and not a file" % app_profile_path)
1118                return package_name, ability_name
1119
1120            # get package_name and ability_name value
1121            load_dict = {}
1122            with open(app_profile_path, 'r') as load_f:
1123                load_dict = json.load(load_f)
1124            profile_list = load_dict.values()
1125            for profile in profile_list:
1126                package_name = profile.get("package")
1127                if not package_name:
1128                    continue
1129                abilities = profile.get("abilities")
1130                for abilitie in abilities:
1131                    abilities_name = abilitie.get("name")
1132                    if abilities_name.startswith("."):
1133                        ability_name = package_name + abilities_name[
1134                                                      abilities_name.find("."):]
1135                    else:
1136                        ability_name = abilities_name
1137                    break
1138                break
1139
1140            # delete hap_bak_path
1141            if os.path.exists(hap_bak_path):
1142                shutil.rmtree(hap_bak_path)
1143        else:
1144            print("file %s not exist" % hap_filepath)
1145        return package_name, ability_name
1146
1147
1148@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
1149class OHRustTestDriver(IDriver):
1150    def __init__(self):
1151        self.result = ""
1152        self.error_message = ""
1153        self.config = None
1154
1155    def __check_environment__(self, device_options):
1156        pass
1157
1158    def __check_config__(self, config):
1159        pass
1160
1161    def __execute__(self, request):
1162        try:
1163            LOG.debug("Start to execute open harmony rust test")
1164            self.config = request.config
1165            self.config.device = request.config.environment.devices[0]
1166            self.config.target_test_path = DEFAULT_TEST_PATH
1167
1168            suite_file = request.root.source.source_file
1169            LOG.debug("Testsuite filepath:{}".format(suite_file))
1170
1171            if not suite_file:
1172                LOG.error("test source '{}' not exists".format(
1173                    request.root.source.source_string))
1174                return
1175
1176            self.result = "{}.xml".format(
1177                os.path.join(request.config.report_path,
1178                             "result", request.get_module_name()))
1179            self.config.device.set_device_report_path(request.config.report_path)
1180            self.config.device.device_log_collector.start_hilog_task()
1181            self._init_oh_rust()
1182            self._run_oh_rust(suite_file, request)
1183        except Exception as exception:
1184            self.error_message = exception
1185            if not getattr(exception, "error_no", ""):
1186                setattr(exception, "error_no", "03409")
1187            LOG.exception(self.error_message, exc_info=False, error_no="03409")
1188        finally:
1189            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
1190                                    time.time_ns())
1191            log_tar_file_name = "{}_{}".format(
1192                request.get_module_name(), str(serial).replace(":", "_"))
1193            self.config.device.device_log_collector.stop_hilog_task(
1194                log_tar_file_name)
1195            self.result = check_result_report(
1196                request.config.report_path, self.result, self.error_message)
1197
1198    def _init_oh_rust(self):
1199        self.config.device.connector_command("target mount")
1200        self.config.device.execute_shell_command(
1201            "mount -o rw,remount,rw /")
1202
1203    def _run_oh_rust(self, suite_file, request=None):
1204        self.config.device.push_file(suite_file, self.config.target_test_path)
1205        resource_manager = ResourceManager()
1206        resource_data_dict, resource_dir = \
1207            resource_manager.get_resource_data_dic(suite_file)
1208        resource_manager.process_preparer_data(resource_data_dict,
1209                                               resource_dir,
1210                                               self.config.device)
1211        for listener in request.listeners:
1212            listener.device_sn = self.config.device.device_sn
1213
1214        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust)
1215        if parsers:
1216            parsers = parsers[:1]
1217        parser_instances = []
1218        for parser in parsers:
1219            parser_instance = parser.__class__()
1220            parser_instance.suite_name = request.get_module_name()
1221            parser_instance.listeners = request.listeners
1222            parser_instances.append(parser_instance)
1223        handler = ShellHandler(parser_instances)
1224        if self.config.coverage:
1225            command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format(
1226                self.config.target_test_path, os.path.basename(suite_file))
1227        else:
1228            command = "cd {}; chmod +x *; ./{}".format(
1229                self.config.target_test_path, os.path.basename(suite_file))
1230        self.config.device.execute_shell_command(
1231            command, timeout=TIME_OUT, receiver=handler, retry=0)
1232        if self.config.coverage:
1233            result = ResultManager(suite_file, self.config)
1234            result.obtain_coverage_data()
1235        resource_manager.process_cleaner_data(resource_data_dict, resource_dir,
1236                                              self.config.device)
1237
1238    def __result__(self):
1239        return self.result if os.path.exists(self.result) else ""
1240