• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21import re
22import shutil
23import subprocess
24import time
25import platform
26import zipfile
27import stat
28from dataclasses import dataclass
29from json import JSONDecodeError
30
31from xdevice import DeviceTestType
32from xdevice import DeviceLabelType
33from xdevice import CommonParserType
34from xdevice import ExecuteTerminate
35from xdevice import DeviceError
36from xdevice import ShellHandler
37
38from xdevice import IDriver
39from xdevice import platform_logger
40from xdevice import Plugin
41from xdevice import get_plugin
42from ohos.environment.dmlib import process_command_ret
43from core.utils import get_decode
44from core.utils import get_fuzzer_path
45from core.config.resource_manager import ResourceManager
46from core.config.config_manager import FuzzerConfigManager
47
48
49__all__ = [
50    "CppTestDriver",
51    "JSUnitTestDriver",
52    "disable_keyguard",
53    "GTestConst"]
54
55LOG = platform_logger("Drivers")
56DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
57
58TIME_OUT = 900 * 1000
59JS_TIMEOUT = 10
60CYCLE_TIMES = 30
61
62
63##############################################################################
64##############################################################################
65
66
67class DisplayOutputReceiver:
68    def __init__(self):
69        self.output = ""
70        self.unfinished_line = ""
71
72    def _process_output(self, output, end_mark="\n"):
73        content = output
74        if self.unfinished_line:
75            content = "".join((self.unfinished_line, content))
76            self.unfinished_line = ""
77        lines = content.split(end_mark)
78        if content.endswith(end_mark):
79            return lines[:-1]
80        else:
81            self.unfinished_line = lines[-1]
82            return lines[:-1]
83
84    def __read__(self, output):
85        self.output = "%s%s" % (self.output, output)
86        lines = self._process_output(output)
87        for line in lines:
88            line = line.strip()
89            if line:
90                LOG.info(get_decode(line))
91
92    def __error__(self, message):
93        pass
94
95    def __done__(self, result_code="", message=""):
96        pass
97
98
99@dataclass
100class GTestConst(object):
101    exec_para_filter = "--gtest_filter"
102    exec_para_level = "--gtest_testsize"
103    exec_acts_para_filter = "--jstest_filter"
104    exec_acts_para_level = "--jstest_testsize"
105
106
107def get_device_log_file(report_path, serial=None, log_name="device_log"):
108    from xdevice import Variables
109    log_path = os.path.join(report_path, Variables.report_vars.log_dir)
110    os.makedirs(log_path, exist_ok=True)
111
112    serial = serial or time.time_ns()
113    device_file_name = "{}_{}.log".format(log_name, serial)
114    device_log_file = os.path.join(log_path, device_file_name)
115    return device_log_file
116
117
118def get_level_para_string(level_string):
119    level_list = list(set(level_string.split(",")))
120    level_para_string = ""
121    for item in level_list:
122        if not item.isdigit():
123            continue
124        item = item.strip(" ")
125        level_para_string = f"{level_para_string}Level{item},"
126    level_para_string = level_para_string.strip(",")
127    return level_para_string
128
129
130def get_result_savepath(testsuit_path, result_rootpath):
131    findkey = os.sep + "tests" + os.sep
132    filedir, _ = os.path.split(testsuit_path)
133    pos = filedir.find(findkey)
134    if -1 != pos:
135        subpath = filedir[pos + len(findkey):]
136        pos1 = subpath.find(os.sep)
137        if -1 != pos1:
138            subpath = subpath[pos1 + len(os.sep):]
139            result_path = os.path.join(result_rootpath, "result", subpath)
140        else:
141            result_path = os.path.join(result_rootpath, "result")
142    else:
143        result_path = os.path.join(result_rootpath, "result")
144
145    if not os.path.exists(result_path):
146        os.makedirs(result_path)
147
148    LOG.info("result_savepath = " + result_path)
149    return result_path
150
151
152# all testsuit common Unavailable test result xml
153def _create_empty_result_file(filepath, filename, error_message):
154    error_message = str(error_message)
155    error_message = error_message.replace("\"", "")
156    error_message = error_message.replace("<", "")
157    error_message = error_message.replace(">", "")
158    error_message = error_message.replace("&", "")
159    if filename.endswith(".hap"):
160        filename = filename.split(".")[0]
161    if not os.path.exists(filepath):
162        with open(filepath, "w", encoding='utf-8') as file_desc:
163            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
164                                       time.localtime())
165            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
166            file_desc.write(
167                '<testsuites tests="0" failures="0" '
168                'disabled="0" errors="0" timestamp="%s" '
169                'time="0" name="AllTests">\n' % time_stamp)
170            file_desc.write(
171                '  <testsuite name="%s" tests="0" failures="0" '
172                'disabled="0" errors="0" time="0.0" '
173                'unavailable="1" message="%s">\n' %
174                (filename, error_message))
175            file_desc.write('  </testsuite>\n')
176            file_desc.write('</testsuites>\n')
177    return
178
179
180def _unlock_screen(device):
181    device.execute_shell_command("svc power stayon true")
182    time.sleep(1)
183
184
185def _unlock_device(device):
186    device.execute_shell_command("input keyevent 82")
187    time.sleep(1)
188    device.execute_shell_command("wm dismiss-keyguard")
189    time.sleep(1)
190
191
192def _lock_screen(device):
193    device.execute_shell_command("svc power stayon false")
194    time.sleep(1)
195
196
197def disable_keyguard(device):
198    _unlock_screen(device)
199    _unlock_device(device)
200
201
202def _sleep_according_to_result(result):
203    if result:
204        time.sleep(1)
205
206def _create_fuzz_crash_file(filepath, filename):
207    if not os.path.exists(filepath):
208        with open(filepath, "w", encoding='utf-8') as file_desc:
209            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
210                                       time.localtime())
211            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
212            file_desc.write(
213                '<testsuites disabled="0" name="AllTests" '
214                'time="300" timestamp="%s" errors="0" '
215                'failures="1" tests="1">\n' % time_stamp)
216            file_desc.write(
217                '  <testsuite disabled="0" name="%s" time="300" '
218                'errors="0" failures="1" tests="1">\n' % filename)
219            file_desc.write(
220                '    <testcase name="%s" time="300" classname="%s" '
221                'status="run">\n' % (filename, filename))
222            file_desc.write(
223                '      <failure type="" '
224                'message="Fuzzer crash. See ERROR in log file">\n')
225            file_desc.write('      </failure>\n')
226            file_desc.write('    </testcase>\n')
227            file_desc.write('  </testsuite>\n')
228            file_desc.write('</testsuites>\n')
229    return
230
231def _create_fuzz_pass_file(filepath, filename):
232    if not os.path.exists(filepath):
233        with open(filepath, "w", encoding='utf-8') as file_desc:
234            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
235                                       time.localtime())
236            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
237            file_desc.write(
238                '<testsuites disabled="0" name="AllTests" '
239                'time="300" timestamp="%s" errors="0" '
240                'failures="0" tests="1">\n' % time_stamp)
241            file_desc.write(
242                '  <testsuite disabled="0" name="%s" time="300" '
243                'errors="0" failures="0" tests="1">\n' % filename)
244            file_desc.write(
245                '    <testcase name="%s" time="300" classname="%s" '
246                'status="run"/>\n' % (filename, filename))
247            file_desc.write('  </testsuite>\n')
248            file_desc.write('</testsuites>\n')
249    return
250
251def _create_fuzz_result_file(filepath, filename, error_message):
252    error_message = str(error_message)
253    error_message = error_message.replace("\"", "")
254    error_message = error_message.replace("<", "")
255    error_message = error_message.replace(">", "")
256    error_message = error_message.replace("&", "")
257    if "AddressSanitizer" in error_message:
258        LOG.error("FUZZ TEST CRASH")
259        _create_fuzz_crash_file(filepath, filename)
260    elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second',
261                   error_message, re.M) is not None:
262        LOG.info("FUZZ TEST PASS")
263        _create_fuzz_pass_file(filepath, filename)
264    else:
265        LOG.error("FUZZ TEST UNAVAILABLE")
266        _create_empty_result_file(filepath, filename, error_message)
267    return
268##############################################################################
269##############################################################################
270
271class ResultManager(object):
272    def __init__(self, testsuit_path, config):
273        self.testsuite_path = testsuit_path
274        self.config = config
275        self.result_rootpath = self.config.report_path
276        self.device = self.config.device
277        if testsuit_path.endswith(".hap"):
278            self.device_testpath = self.config.test_hap_out_path
279        else:
280            self.device_testpath = self.config.target_test_path
281        self.testsuite_name = os.path.basename(self.testsuite_path)
282        self.is_coverage = False
283
284    def set_is_coverage(self, is_coverage):
285        self.is_coverage = is_coverage
286
287    def get_test_results(self, error_message=""):
288        # Get test result files
289        filepath = self.obtain_test_result_file()
290        if "fuzztest" == self.config.testtype[0]:
291            LOG.info("create fuzz test report")
292            _create_fuzz_result_file(filepath, self.testsuite_name,
293                                     error_message)
294            return filepath
295        if not os.path.exists(filepath):
296            _create_empty_result_file(filepath, self.testsuite_name,
297                                      error_message)
298        if "benchmark" == self.config.testtype[0]:
299            self._obtain_benchmark_result()
300        # Get coverage data files
301        if self.is_coverage:
302            self.obtain_coverage_data()
303
304        return filepath
305
306    def _obtain_benchmark_result(self):
307        benchmark_root_dir = os.path.abspath(
308            os.path.join(self.result_rootpath, "benchmark"))
309        benchmark_dir = os.path.abspath(
310            os.path.join(benchmark_root_dir,
311                         self.get_result_sub_save_path(),
312                         self.testsuite_name))
313
314        if not os.path.exists(benchmark_dir):
315            os.makedirs(benchmark_dir)
316
317        LOG.info("benchmark_dir = %s" % benchmark_dir)
318        self.device.pull_file(os.path.join(self.device_testpath,
319            "%s.json" % self.testsuite_name), benchmark_dir)
320        if not os.path.exists(os.path.join(benchmark_dir,
321            "%s.json" % self.testsuite_name)):
322            os.rmdir(benchmark_dir)
323        return benchmark_dir
324
325    def get_result_sub_save_path(self):
326        find_key = os.sep + "benchmark" + os.sep
327        file_dir, _ = os.path.split(self.testsuite_path)
328        pos = file_dir.find(find_key)
329        subpath = ""
330        if -1 != pos:
331            subpath = file_dir[pos + len(find_key):]
332        LOG.info("subpath = " + subpath)
333        return subpath
334
335    def obtain_test_result_file(self):
336        result_save_path = get_result_savepath(self.testsuite_path,
337            self.result_rootpath)
338        result_file_path = os.path.join(result_save_path,
339            "%s.xml" % self.testsuite_name)
340
341        result_josn_file_path = os.path.join(result_save_path,
342            "%s.json" % self.testsuite_name)
343
344        if self.testsuite_path.endswith('.hap'):
345            remote_result_file = os.path.join(self.device_testpath,
346                "testcase_result.xml")
347            remote_json_result_file = os.path.join(self.device_testpath,
348                "%s.json" % self.testsuite_name)
349        else:
350            remote_result_file = os.path.join(self.device_testpath,
351                "%s.xml" % self.testsuite_name)
352            remote_json_result_file = os.path.join(self.device_testpath,
353                "%s.json" % self.testsuite_name)
354
355        if self.config.testtype[0] != "fuzztest":
356            if self.device.is_file_exist(remote_result_file):
357                self.device.pull_file(remote_result_file, result_file_path)
358            elif self.device.is_file_exist(remote_json_result_file):
359                self.device.pull_file(remote_json_result_file,
360                                    result_josn_file_path)
361                result_file_path = result_josn_file_path
362            else:
363                LOG.info("%s not exist", remote_result_file)
364
365        return result_file_path
366
367    def make_empty_result_file(self, error_message=""):
368        result_savepath = get_result_savepath(self.testsuite_path,
369            self.result_rootpath)
370        result_filepath = os.path.join(result_savepath, "%s.xml" %
371            self.testsuite_name)
372        if not os.path.exists(result_filepath):
373            _create_empty_result_file(result_filepath,
374                self.testsuite_name, error_message)
375
376    def is_exist_target_in_device(self, path, target):
377        if platform.system() == "Windows":
378            command = '\"ls -l %s | grep %s\"' % (path, target)
379        else:
380            command = "ls -l %s | grep %s" % (path, target)
381
382        check_result = False
383        stdout_info = self.device.execute_shell_command(command)
384        if stdout_info != "" and stdout_info.find(target) != -1:
385            check_result = True
386        return check_result
387
388    def obtain_coverage_data(self):
389        cov_root_dir = os.path.abspath(os.path.join(
390            self.result_rootpath,
391            "..",
392            "coverage",
393            "data",
394            "exec"))
395
396        target_name = "obj"
397        cxx_cov_path = os.path.abspath(os.path.join(
398            self.result_rootpath,
399            "..",
400            "coverage",
401            "data",
402            "cxx",
403            self.testsuite_name + '_' + self.config.testtype[0]))
404
405        if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name):
406            if not os.path.exists(cxx_cov_path):
407                os.makedirs(cxx_cov_path)
408            self.config.device.execute_shell_command(
409                "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name))
410            src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name)
411            self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT)
412            tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name)
413            if platform.system() == "Windows":
414                process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True)
415                process.communicate()
416                os.remove(tar_path)
417            else:
418                subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" %
419                                 (tar_path, cxx_cov_path), shell=True)
420                subprocess.Popen("rm -rf %s" % tar_path, shell=True)
421
422
423##############################################################################
424##############################################################################
425
426@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test)
427class CppTestDriver(IDriver):
428    """
429    CppTest is a Test that runs a native test package on given device.
430    """
431    # test driver config
432    config = None
433    result = ""
434
435    def __check_environment__(self, device_options):
436        if len(device_options) == 1 and device_options[0].label is None:
437            return True
438        if len(device_options) != 1 or \
439                device_options[0].label != DeviceLabelType.phone:
440            return False
441        return True
442
443    def __check_config__(self, config):
444        pass
445
446    def __result__(self):
447        return self.result if os.path.exists(self.result) else ""
448
449    def __execute__(self, request):
450        try:
451            self.config = request.config
452            self.config.target_test_path = DEFAULT_TEST_PATH
453            self.config.device = request.config.environment.devices[0]
454
455            suite_file = request.root.source.source_file
456            LOG.debug("Testsuite FilePath: %s" % suite_file)
457
458            if not suite_file:
459                LOG.error("test source '%s' not exists" %
460                          request.root.source.source_string)
461                return
462
463            if not self.config.device:
464                result = ResultManager(suite_file, self.config)
465                result.set_is_coverage(False)
466                result.make_empty_result_file(
467                    "No test device is found. ")
468                return
469
470            self.config.device.set_device_report_path(request.config.report_path)
471            self.config.device.device_log_collector.start_hilog_task()
472            self._init_gtest()
473            self._run_gtest(suite_file)
474
475        finally:
476            serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns())
477            log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace(
478                ":", "_"))
479            self.config.device.device_log_collector.stop_hilog_task(log_tar_file_name)
480
481    def _init_gtest(self):
482        self.config.device.connector_command("target mount")
483        self.config.device.execute_shell_command(
484            "rm -rf %s" % self.config.target_test_path)
485        self.config.device.execute_shell_command(
486            "mkdir -p %s" % self.config.target_test_path)
487        self.config.device.execute_shell_command(
488            "mount -o rw,remount,rw /")
489        if "fuzztest" == self.config.testtype[0]:
490            self.config.device.execute_shell_command(
491                "mkdir -p %s" % os.path.join(self.config.target_test_path,
492                "corpus"))
493
494    def _run_gtest(self, suite_file):
495        from xdevice import Variables
496        filename = os.path.basename(suite_file)
497        test_para = self._get_test_para(self.config.testcase,
498                                        self.config.testlevel,
499                                        self.config.testtype,
500                                        self.config.target_test_path,
501                                        suite_file,
502                                        filename)
503        is_coverage_test = True if self.config.coverage else False
504
505        # push testsuite file
506        self.config.device.push_file(suite_file, self.config.target_test_path)
507        self._push_corpus_if_exist(suite_file)
508
509        # push resource files
510        resource_manager = ResourceManager()
511        resource_data_dic, resource_dir = \
512            resource_manager.get_resource_data_dic(suite_file)
513        resource_manager.process_preparer_data(resource_data_dic, resource_dir,
514                                               self.config.device)
515
516        # execute testcase
517        if not self.config.coverage:
518            command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % (
519                self.config.target_test_path,
520                filename,
521                filename,
522                test_para)
523        else:
524            coverage_outpath = self.config.coverage_outpath
525            strip_num = len(coverage_outpath.split("/")) - 1
526            command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \
527                "GCOV_PREFIX_STRIP=%s ./%s %s" % \
528                (self.config.target_test_path,
529                 filename,
530                 str(strip_num),
531                 filename,
532                 test_para)
533
534        result = ResultManager(suite_file, self.config)
535        result.set_is_coverage(is_coverage_test)
536
537        try:
538            # get result
539            display_receiver = DisplayOutputReceiver()
540            self.config.device.execute_shell_command(
541                command,
542                receiver=display_receiver,
543                timeout=TIME_OUT,
544                retry=0)
545            return_message = display_receiver.output
546        except (ExecuteTerminate, DeviceError) as exception:
547            return_message = str(exception.args)
548
549        self.result = result.get_test_results(return_message)
550        resource_manager.process_cleaner_data(resource_data_dic,
551            resource_dir,
552            self.config.device)
553
554    def _push_corpus_if_exist(self, suite_file):
555        if "fuzztest" == self.config.testtype[0]:
556            corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus")
557            if not os.path.isdir(corpus_path):
558                return
559
560            corpus_dirs = []
561            corpus_file_list = []
562
563            for root, _, files in os.walk(corpus_path):
564                if not files:
565                    continue
566
567                corpus_dir = root.split("corpus")[-1]
568                if corpus_dir != "":
569                    corpus_dirs.append(corpus_dir)
570
571                for file in files:
572                    corpus_file_list.append(os.path.normcase(
573                        os.path.join(root, file)))
574
575            # mkdir corpus files dir
576            if corpus_dirs:
577                for corpus in corpus_dirs:
578                    mkdir_corpus_command = f"shell; mkdir -p {corpus}"
579                    self.config.device.connector_command(mkdir_corpus_command)
580
581            # push corpus file
582            if corpus_file_list:
583                for corpus_file in corpus_file_list:
584                    self.config.device.push_file(corpus_file,
585                        os.path.join(self.config.target_test_path, "corpus"))
586
587    @staticmethod
588    def _get_test_para(testcase,
589                       testlevel,
590                       testtype,
591                       target_test_path,
592                       suite_file,
593                       filename):
594        if "benchmark" == testtype[0]:
595            test_para = (" --benchmark_out_format=json"
596                         " --benchmark_out=%s%s.json") % (
597                            target_test_path, filename)
598            return test_para
599
600        if "" != testcase and "" == testlevel:
601            test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
602        elif "" == testcase and "" != testlevel:
603            level_para = get_level_para_string(testlevel)
604            test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
605        else:
606            test_para = ""
607
608        if "fuzztest" == testtype[0]:
609            cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path(
610                suite_file), "project.xml")).get_fuzzer_config("fuzztest")
611            LOG.info("config list :%s" % str(cfg_list))
612            test_para += "corpus -max_len=" + cfg_list[0] + \
613                         " -max_total_time=" + cfg_list[1] + \
614                         " -rss_limit_mb=" + cfg_list[2]
615        return test_para
616
617
618##############################################################################
619##############################################################################
620
621@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test)
622class JSUnitTestDriver(IDriver):
623    """
624    JSUnitTestDriver is a Test that runs a native test package on given device.
625    """
626
627    def __init__(self):
628        self.config = None
629        self.result = ""
630        self.start_time = None
631        self.ability_name = ""
632        self.package_name = ""
633        self.hilog = None
634        self.hilog_proc = None
635
636    def __check_environment__(self, device_options):
637        pass
638
639    def __check_config__(self, config):
640        pass
641
642    def __result__(self):
643        return self.result if os.path.exists(self.result) else ""
644
645    def __execute__(self, request):
646        try:
647            LOG.info("developertest driver")
648            self.result = os.path.join(
649                request.config.report_path, "result",
650                '.'.join((request.get_module_name(), "xml")))
651            self.config = request.config
652            self.config.target_test_path = DEFAULT_TEST_PATH
653            self.config.device = request.config.environment.devices[0]
654
655            suite_file = request.root.source.source_file
656            if not suite_file:
657                LOG.error("test source '%s' not exists" %
658                          request.root.source.source_string)
659                return
660
661            if not self.config.device:
662                result = ResultManager(suite_file, self.config)
663                result.set_is_coverage(False)
664                result.make_empty_result_file(
665                    "No test device is found")
666                return
667
668            package_name, ability_name = self._get_package_and_ability_name(
669                suite_file)
670            self.package_name = package_name
671            self.ability_name = ability_name
672            self.config.test_hap_out_path = \
673                "/data/data/%s/files/" % self.package_name
674            self.config.device.connector_command("shell hilog -r")
675
676            self.hilog = get_device_log_file(
677                request.config.report_path,
678                request.config.device.__get_serial__() + "_" + request.
679                get_module_name(),
680                "device_hilog")
681
682            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
683                                 0o755)
684
685            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
686                self.config.device.device_log_collector.add_log_address(None, self.hilog)
687                _, self.hilog_proc = self.config.device.device_log_collector. \
688                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
689                self._init_jsunit_test()
690                self._run_jsunit(suite_file, self.hilog)
691                hilog_file_pipe.flush()
692                self.generate_console_output(self.hilog, request)
693        finally:
694            self.config.device.device_log_collector.remove_log_address(None, self.hilog)
695            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
696
697    def _init_jsunit_test(self):
698        self.config.device.connector_command("target mount")
699        self.config.device.execute_shell_command(
700            "rm -rf %s" % self.config.target_test_path)
701        self.config.device.execute_shell_command(
702            "mkdir -p %s" % self.config.target_test_path)
703        self.config.device.execute_shell_command(
704            "mount -o rw,remount,rw /")
705
706    def _run_jsunit(self, suite_file, device_log_file):
707        filename = os.path.basename(suite_file)
708        _, suffix_name = os.path.splitext(filename)
709
710        resource_manager = ResourceManager()
711        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
712        if suffix_name == ".hap":
713            json_file_path = suite_file.replace(".hap", ".json")
714            if os.path.exists(json_file_path):
715                timeout = self._get_json_shell_timeout(json_file_path)
716            else:
717                timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
718        else:
719            timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
720        resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device)
721        main_result = self._install_hap(suite_file)
722        result = ResultManager(suite_file, self.config)
723        if main_result:
724            self._execute_hapfile_jsunittest()
725            try:
726                status = False
727                actiontime = JS_TIMEOUT
728                times = CYCLE_TIMES
729                if timeout:
730                    actiontime = timeout
731                    times = 1
732                device_log_file_open = os.open(device_log_file, os.O_RDONLY, stat.S_IWUSR | stat.S_IRUSR)
733                with os.fdopen(device_log_file_open, "r", encoding='utf-8') \
734                        as file_read_pipe:
735                    for i in range(0, times):
736                        if status:
737                            break
738                        else:
739                            time.sleep(float(actiontime))
740                        start_time = int(time.time())
741                        while True:
742                            data = file_read_pipe.readline()
743                            if data.find("JSApp:") != -1 and data.find("[end] run suites end") != -1:
744                                LOG.info("execute testcase successfully.")
745                                status = True
746                                break
747                            if int(time.time()) - start_time > 5:
748                                break
749            finally:
750                _lock_screen(self.config.device)
751                self._uninstall_hap(self.package_name)
752        else:
753            self.result = result.get_test_results("Error: install hap failed")
754            LOG.error("Error: install hap failed")
755
756        resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device)
757
758    def generate_console_output(self, device_log_file, request):
759        result_message = self.read_device_log(device_log_file)
760
761        report_name = request.get_module_name()
762        parsers = get_plugin(
763            Plugin.PARSER, CommonParserType.jsunit)
764        if parsers:
765            parsers = parsers[:1]
766        for listener in request.listeners:
767            listener.device_sn = self.config.device.device_sn
768        parser_instances = []
769
770        for parser in parsers:
771            parser_instance = parser.__class__()
772            parser_instance.suites_name = report_name
773            parser_instance.suite_name = report_name
774            parser_instance.listeners = request.listeners
775            parser_instances.append(parser_instance)
776        handler = ShellHandler(parser_instances)
777        process_command_ret(result_message, handler)
778
779    def read_device_log(self, device_log_file):
780        device_log_file_open = os.open(device_log_file, os.O_RDONLY,
781                                       stat.S_IWUSR | stat.S_IRUSR)
782
783        result_message = ""
784        with os.fdopen(device_log_file_open, "r", encoding='utf-8') \
785                as file_read_pipe:
786            while True:
787                data = file_read_pipe.readline()
788                if not data:
789                    break
790                # only filter JSApp log
791                if data.find("JSApp:") != -1:
792                    result_message += data
793                    if data.find("[end] run suites end") != -1:
794                        break
795        return result_message
796
797    def _execute_hapfile_jsunittest(self):
798        _unlock_screen(self.config.device)
799        _unlock_device(self.config.device)
800
801        try:
802            return_message = self.start_hap_execute()
803        except (ExecuteTerminate, DeviceError) as exception:
804            return_message = str(exception.args)
805
806        return return_message
807
808    def _install_hap(self, suite_file):
809        message = self.config.device.connector_command("install %s" % suite_file)
810        message = str(message).rstrip()
811        if message == "" or "success" in message:
812            return_code = True
813            if message != "":
814                LOG.info(message)
815        else:
816            return_code = False
817            if message != "":
818                LOG.warning(message)
819
820        _sleep_according_to_result(return_code)
821        return return_code
822
823    def start_hap_execute(self):
824        try:
825            command = "aa start -d 123 -a %s.MainAbility -b %s" \
826                      % (self.package_name, self.package_name)
827            self.start_time = time.time()
828            result_value = self.config.device.execute_shell_command(
829                command, timeout=TIME_OUT)
830
831            if "success" in str(result_value).lower():
832                LOG.info("execute %s's testcase success. result value=%s"
833                         % (self.package_name, result_value))
834            else:
835                LOG.info("execute %s's testcase failed. result value=%s"
836                         % (self.package_name, result_value))
837
838            _sleep_according_to_result(result_value)
839            return_message = result_value
840        except (ExecuteTerminate, DeviceError) as exception:
841            return_message = exception.args
842
843        return return_message
844
845    def _uninstall_hap(self, package_name):
846        return_message = self.config.device.execute_shell_command(
847            "bm uninstall -n %s" % package_name)
848        _sleep_according_to_result(return_message)
849        return return_message
850
851    @staticmethod
852    def _get_acts_test_para(testcase,
853                       testlevel,
854                       testtype,
855                       target_test_path,
856                       suite_file,
857                       filename):
858        if "actstest" == testtype[0]:
859            test_para = (" --actstest_out_format=json"
860                         " --actstest_out=%s%s.json") % (
861                            target_test_path, filename)
862            return test_para
863
864        if "" != testcase and "" == testlevel:
865            test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase)
866        elif "" == testcase and "" != testlevel:
867            level_para = get_level_para_string(testlevel)
868            test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para)
869        else:
870            test_para = ""
871        return test_para
872
873    @classmethod
874    def _get_json_shell_timeout(cls, json_filepath):
875        test_timeout = 300
876        try:
877            with open(json_filepath, 'r') as json_file:
878                data_dic = json.load(json_file)
879                if not data_dic:
880                    return test_timeout
881                else:
882                    if "driver" in data_dic.keys():
883                        driver_dict = data_dic.get("driver")
884                        if driver_dict and "test-timeout" in driver_dict.keys():
885                            test_timeout = int(driver_dict["shell-timeout"]) / 1000
886                    return test_timeout
887        except JSONDecodeError:
888            return test_timeout
889        finally:
890            print(" get json shell timeout finally")
891
892
893    @staticmethod
894    def _get_package_and_ability_name(hap_filepath):
895        package_name = ""
896        ability_name = ""
897        if os.path.exists(hap_filepath):
898            filename = os.path.basename(hap_filepath)
899
900            #unzip the hap file
901            hap_bak_path = os.path.abspath(os.path.join(
902                os.path.dirname(hap_filepath),
903                "%s.bak" % filename))
904            zf_desc = zipfile.ZipFile(hap_filepath)
905            try:
906                zf_desc.extractall(path=hap_bak_path)
907            except RuntimeError as error:
908                print(error)
909            zf_desc.close()
910
911            #verify config.json file
912            app_profile_path = os.path.join(hap_bak_path, "config.json")
913            if not os.path.exists(app_profile_path):
914                print("file %s not exist" % app_profile_path)
915                return package_name, ability_name
916
917            if os.path.isdir(app_profile_path):
918                print("%s is a folder, and not a file" % app_profile_path)
919                return package_name, ability_name
920
921            #get package_name and ability_name value
922            load_dict = {}
923            with open(app_profile_path, 'r') as load_f:
924                load_dict = json.load(load_f)
925            profile_list = load_dict.values()
926            for profile in profile_list:
927                package_name = profile.get("package")
928                if not package_name:
929                    continue
930                abilities = profile.get("abilities")
931                for abilitie in abilities:
932                    abilities_name = abilitie.get("name")
933                    if abilities_name.startswith("."):
934                        ability_name = package_name + abilities_name[
935                                       abilities_name.find("."):]
936                    else:
937                        ability_name = abilities_name
938                    break
939                break
940
941            #delete hap_bak_path
942            if os.path.exists(hap_bak_path):
943                shutil.rmtree(hap_bak_path)
944        else:
945            print("file %s not exist" % hap_filepath)
946        return package_name, ability_name
947