• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2024 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21import shutil
22import stat
23import time
24
25from xdevice import ParamError
26from xdevice import get_device_log_file
27from xdevice import check_result_report
28from xdevice import get_kit_instances
29from xdevice import do_module_kit_setup
30from xdevice import do_module_kit_teardown
31from xdevice import get_config_value
32from xdevice import Plugin
33from xdevice import DeviceTestType
34from xdevice import IDriver
35from xdevice import get_plugin
36from xdevice import CommonParserType
37from xdevice import ShellHandler
38from xdevice import ConfigConst
39from xdevice import JsonParser
40from xdevice import TestDescription
41from xdevice import platform_logger
42
43from ohos.constants import CKit
44from ohos.executor.listener import CollectingPassListener
45from core.driver.drivers import update_xml
46from core.driver.drivers import get_result_savepath
47
48__all__ = ["OHJSUnitTestDriver", "oh_jsunit_para_parse"]
49
50LOG = platform_logger("Drivers")
51TIME_OUT = 300 * 1000
52
53
54def oh_jsunit_para_parse(runner, junit_paras):
55    junit_paras = dict(junit_paras)
56    test_type_list = ["function", "performance", "reliability", "security"]
57    size_list = ["small", "medium", "large"]
58    level_list = ["0", "1", "2", "3"]
59    for para_name in junit_paras.keys():
60        para_name = para_name.strip()
61        para_values = junit_paras.get(para_name, [])
62        if para_name == "class":
63            runner.add_arg(para_name, ",".join(para_values))
64        elif para_name == "notClass":
65            runner.add_arg(para_name, ",".join(para_values))
66        elif para_name == "testType":
67            if para_values[0] not in test_type_list:
68                continue
69            # function/performance/reliability/security
70            runner.add_arg(para_name, para_values[0])
71        elif para_name == "size":
72            if para_values[0] not in size_list:
73                continue
74            # size small/medium/large
75            runner.add_arg(para_name, para_values[0])
76        elif para_name == "level":
77            if para_values[0] not in level_list:
78                continue
79            # 0/1/2/3/4
80            runner.add_arg(para_name, para_values[0])
81        elif para_name == "stress":
82            runner.add_arg(para_name, para_values[0])
83
84
85class OHJSUnitTestRunner:
86    MAX_RETRY_TIMES = 3
87
88    def __init__(self, config):
89        self.arg_list = {}
90        self.suites_name = None
91        self.config = config
92        self.rerun_attemp = 3
93        self.suite_recorder = {}
94        self.finished = False
95        self.expect_tests_dict = dict()
96        self.finished_observer = None
97        self.retry_times = 1
98        self.compile_mode = ""
99
100    def dry_run(self):
101        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list)
102        if parsers:
103            parsers = parsers[:1]
104        parser_instances = []
105        for parser in parsers:
106            parser_instance = parser.__class__()
107            parser_instances.append(parser_instance)
108        handler = ShellHandler(parser_instances)
109        command = self._get_dry_run_command()
110        self.config.device.execute_shell_command(
111            command, timeout=self.config.timeout, receiver=handler, retry=0)
112        self.expect_tests_dict = parser_instances[0].tests_dict
113        return parser_instances[0].tests
114
115    def run(self, listener):
116        handler = self._get_shell_handler(listener)
117        command = self._get_run_command()
118        self.config.device.execute_shell_command(
119            command, timeout=self.config.timeout, receiver=handler, retry=0)
120
121    def notify_finished(self):
122        if self.finished_observer:
123            self.finished_observer.notify_task_finished()
124        self.retry_times -= 1
125
126    def _get_shell_handler(self, listener):
127        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
128        if parsers:
129            parsers = parsers[:1]
130        parser_instances = []
131        for parser in parsers:
132            parser_instance = parser.__class__()
133            parser_instance.suites_name = self.suites_name
134            parser_instance.listeners = listener
135            parser_instance.runner = self
136            parser_instances.append(parser_instance)
137            self.finished_observer = parser_instance
138        handler = ShellHandler(parser_instances)
139        return handler
140
141    def add_arg(self, name, value):
142        if not name or not value:
143            return
144        self.arg_list[name] = value
145
146    def remove_arg(self, name):
147        if not name:
148            return
149        if name in self.arg_list:
150            del self.arg_list[name]
151
152    def get_args_command(self):
153        args_commands = ""
154        for key, value in self.arg_list.items():
155            if "wait_time" == key:
156                args_commands = "%s -w %s " % (args_commands, value)
157            else:
158                args_commands = "%s -s %s %s " % (args_commands, key, value)
159        return args_commands
160
161    def _get_run_command(self):
162        command = ""
163        if self.config.package_name:
164            # aa test -p ${packageName} -b ${bundleName}-s
165            # unittest OpenHarmonyTestRunner
166            command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
167                      " {}".format(self.config.package_name,
168                                   self.config.bundle_name,
169                                   self.get_args_command())
170        elif self.config.module_name:
171            #  aa test -m ${moduleName}  -b ${bundleName}
172            #  -s unittest OpenHarmonyTestRunner
173            command = "aa test -m {} -b {} -s unittest {} {}".format(
174                self.config.module_name, self.config.bundle_name,
175                self.get_oh_test_runner_path(), self.get_args_command())
176        return command
177
178    def _get_dry_run_command(self):
179        command = ""
180        if self.config.package_name:
181            command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
182                      " {} -s dryRun true".format(self.config.package_name,
183                                                  self.config.bundle_name,
184                                                  self.get_args_command())
185        elif self.config.module_name:
186            command = "aa test -m {} -b {} -s unittest {}" \
187                      " {} -s dryRun true".format(self.config.module_name,
188                                                  self.config.bundle_name,
189                                                  self.get_oh_test_runner_path(),
190                                                  self.get_args_command())
191
192        return command
193
194    def get_oh_test_runner_path(self):
195        if self.compile_mode == "esmodule":
196            return "/ets/testrunner/OpenHarmonyTestRunner"
197        else:
198            return "OpenHarmonyTestRunner"
199
200
201@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test)
202class OHJSUnitTestDriver(IDriver):
203    """
204    OHJSUnitTestDriver is a Test that runs a native test package on
205    given device.
206    """
207
208    def __init__(self):
209        self.timeout = 80 * 1000
210        self.start_time = None
211        self.result = ""
212        self.error_message = ""
213        self.kits = []
214        self.config = None
215        self.runner = None
216        self.rerun = True
217        self.rerun_all = True
218        # log
219        self.device_log = None
220        self.hilog = None
221        self.log_proc = None
222        self.hilog_proc = None
223
224    def __check_environment__(self, device_options):
225        pass
226
227    def __check_config__(self, config):
228        pass
229
230    def __execute__(self, request):
231        try:
232            LOG.debug("Developer_test Start execute OpenHarmony JSUnitTest")
233            self.config = request.config
234            self.config.device = request.config.environment.devices[0]
235
236            config_file = request.root.source.config_file
237            suite_file = request.root.source.source_file
238            result_save_path = get_result_savepath(suite_file, self.config.report_path)
239            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
240            if not suite_file:
241                raise ParamError(
242                    "test source '%s' not exists" %
243                    request.root.source.source_string, error_no="00110")
244            LOG.debug("Test case file path: %s" % suite_file)
245            self.config.device.set_device_report_path(request.config.report_path)
246            self.hilog = get_device_log_file(request.config.report_path,
247                                        request.config.device.__get_serial__() + "_" + request.
248                                        get_module_name(),
249                                        "device_hilog")
250
251            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
252                                 0o755)
253            self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
254            self.config.device.execute_shell_command(command="hilog -r")
255            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
256                if hasattr(self.config, ConfigConst.device_log) \
257                        and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
258                        and hasattr(self.config.device, "clear_crash_log"):
259                    self.config.device.device_log_collector.clear_crash_log()
260                self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\
261                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
262                self._run_oh_jsunit(config_file, request)
263        except Exception as exception:
264            self.error_message = exception
265            if not getattr(exception, "error_no", ""):
266                setattr(exception, "error_no", "03409")
267            LOG.exception(self.error_message, exc_info=True, error_no="03409")
268            raise exception
269        finally:
270            try:
271                self._handle_logs(request)
272            finally:
273                xml_path = os.path.join(
274                    request.config.report_path, "result",
275                    '.'.join((request.get_module_name(), "xml")))
276                shutil.move(xml_path, self.result)
277                self.result = check_result_report(
278                    request.config.report_path, self.result, self.error_message)
279                update_xml(request.root.source.source_file, self.result)
280
281    def _run_oh_jsunit(self, config_file, request):
282        try:
283            if not os.path.exists(config_file):
284                LOG.error("Error: Test cases don't exist %s." % config_file)
285                raise ParamError(
286                    "Error: Test cases don't exist %s." % config_file,
287                    error_no="00102")
288            json_config = JsonParser(config_file)
289            self.kits = get_kit_instances(json_config,
290                                          self.config.resource_path,
291                                          self.config.testcases_path)
292
293            self._get_driver_config(json_config)
294            self.config.device.connector_command("target mount")
295            self._start_smart_perf()
296            do_module_kit_setup(request, self.kits)
297            self.runner = OHJSUnitTestRunner(self.config)
298            self.runner.suites_name = request.get_module_name()
299            self._get_runner_config(json_config)
300            if hasattr(self.config, "history_report_path") and \
301                    self.config.testargs.get("test"):
302                self._do_test_retry(request.listeners, self.config.testargs)
303            else:
304                if self.rerun:
305                    self.runner.retry_times = self.runner.MAX_RETRY_TIMES
306                    # execute test case
307                self._do_tf_suite()
308                self._make_exclude_list_file(request)
309                oh_jsunit_para_parse(self.runner, self.config.testargs)
310                self._do_test_run(listener=request.listeners)
311
312        finally:
313            do_module_kit_teardown(request)
314
315    def _get_driver_config(self, json_config):
316        package = get_config_value('package-name',
317                                   json_config.get_driver(), False)
318        module = get_config_value('module-name',
319                                  json_config.get_driver(), False)
320        bundle = get_config_value('bundle-name',
321                                  json_config. get_driver(), False)
322        is_rerun = get_config_value('rerun', json_config.get_driver(), False)
323
324        self.config.package_name = package
325        self.config.module_name = module
326        self.config.bundle_name = bundle
327        self.rerun = True if is_rerun == 'true' else False
328
329        if not package and not module:
330            raise ParamError("Neither package nor module is found"
331                             " in config file.", error_no="03201")
332        timeout_config = get_config_value("shell-timeout",
333                                          json_config.get_driver(), False)
334        if timeout_config:
335            self.config.timeout = int(timeout_config)
336        else:
337            self.config.timeout = TIME_OUT
338
339    def _get_runner_config(self, json_config):
340        test_timeout = get_config_value('test-timeout',
341                                        json_config.get_driver(), False)
342        if test_timeout:
343            self.runner.add_arg("wait_time", int(test_timeout))
344
345        testcase_timeout = get_config_value('testcase-timeout',
346                                            json_config.get_driver(), False)
347        if testcase_timeout:
348            self.runner.add_arg("timeout", int(testcase_timeout))
349        self.runner.compile_mode = get_config_value(
350            'compile-mode', json_config.get_driver(), False)
351
352    def _do_test_run(self, listener):
353        test_to_run = self._collect_test_to_run()
354        LOG.info("Collected suite count is: {}, test count is: {}".
355                 format(len(self.runner.expect_tests_dict.keys()),
356                        len(test_to_run) if test_to_run else 0))
357        if not test_to_run or not self.rerun:
358            self.runner.run(listener)
359            self.runner.notify_finished()
360        else:
361            self._run_with_rerun(listener, test_to_run)
362
363    def _collect_test_to_run(self):
364        run_results = self.runner.dry_run()
365        return run_results
366
367    def _run_tests(self, listener):
368        test_tracker = CollectingPassListener()
369        listener_copy = listener.copy()
370        listener_copy.append(test_tracker)
371        self.runner.run(listener_copy)
372        test_run = test_tracker.get_current_run_results()
373        return test_run
374
375    def _run_with_rerun(self, listener, expected_tests):
376        LOG.debug("Developer_test Ready to run with rerun, expect run: %s"
377                  % len(expected_tests))
378        test_run = self._run_tests(listener)
379        self.runner.retry_times -= 1
380        LOG.debug("Run with rerun, has run: %s" % len(test_run)
381                  if test_run else 0)
382        if len(test_run) < len(expected_tests):
383            expected_tests = TestDescription.remove_test(expected_tests,
384                                                         test_run)
385            if not expected_tests:
386                LOG.debug("No tests to re-run twice,please check")
387                self.runner.notify_finished()
388            else:
389                self._rerun_twice(expected_tests, listener)
390        else:
391            LOG.debug("Rerun once success")
392            self.runner.notify_finished()
393
394    def _rerun_twice(self, expected_tests, listener):
395        tests = []
396        for test in expected_tests:
397            tests.append("%s#%s" % (test.class_name, test.test_name))
398        self.runner.add_arg("class", ",".join(tests))
399        LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests))
400        test_run = self._run_tests(listener)
401        self.runner.retry_times -= 1
402        LOG.debug("Rerun twice, has run: %s" % len(test_run))
403        if len(test_run) < len(expected_tests):
404            expected_tests = TestDescription.remove_test(expected_tests,
405                                                         test_run)
406            if not expected_tests:
407                LOG.debug("No tests to re-run third,please check")
408                self.runner.notify_finished()
409            else:
410                self._rerun_third(expected_tests, listener)
411        else:
412            LOG.debug("Rerun twice success")
413            self.runner.notify_finished()
414
415    def _rerun_third(self, expected_tests, listener):
416        tests = []
417        for test in expected_tests:
418            tests.append("%s#%s" % (test.class_name, test.test_name))
419        self.runner.add_arg("class", ",".join(tests))
420        LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests))
421        self._run_tests(listener)
422        LOG.debug("Rerun third success")
423        self.runner.notify_finished()
424
425    def _make_exclude_list_file(self, request):
426        if "all-test-file-exclude-filter" in self.config.testargs:
427            json_file_list = self.config.testargs.get(
428                "all-test-file-exclude-filter")
429            self.config.testargs.pop("all-test-file-exclude-filter")
430            if not json_file_list:
431                LOG.warning("all-test-file-exclude-filter value is empty!")
432            else:
433                if not os.path.isfile(json_file_list[0]):
434                    LOG.warning(
435                        "[{}] is not a valid file".format(json_file_list[0]))
436                    return
437                file_open = os.open(json_file_list[0], os.O_RDONLY,
438                                    stat.S_IWUSR | stat.S_IRUSR)
439                with os.fdopen(file_open, "r") as file_handler:
440                    json_data = json.load(file_handler)
441                exclude_list = json_data.get(
442                    DeviceTestType.oh_jsunit_test, [])
443                filter_list = []
444                for exclude in exclude_list:
445                    if request.get_module_name() not in exclude:
446                        continue
447                    filter_list.extend(exclude.get(request.get_module_name()))
448                if not isinstance(self.config.testargs, dict):
449                    return
450                if 'notClass' in self.config.testargs.keys():
451                    filter_list.extend(self.config.testargs.get('notClass', []))
452                self.config.testargs.update({"notClass": filter_list})
453
454    def _do_test_retry(self, listener, testargs):
455        tests_dict = dict()
456        case_list = list()
457        for test in testargs.get("test"):
458            test_item = test.split("#")
459            if len(test_item) != 2:
460                continue
461            case_list.append(test)
462            if test_item[0] not in tests_dict:
463                tests_dict.update({test_item[0] : []})
464            tests_dict.get(test_item[0]).append(
465                TestDescription(test_item[0], test_item[1]))
466        self.runner.add_arg("class", ",".join(case_list))
467        self.runner.expect_tests_dict = tests_dict
468        self.config.testargs.pop("test")
469        self.runner.run(listener)
470        self.runner.notify_finished()
471
472    def _do_tf_suite(self):
473        if hasattr(self.config, "tf_suite") and \
474                self.config.tf_suite.get("cases", []):
475            case_list = self.config["tf_suite"]["cases"]
476            self.config.testargs.update({"class": case_list})
477
478    def _start_smart_perf(self):
479        if not hasattr(self.config, ConfigConst.kits_in_module):
480            return
481        if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module):
482            return
483        sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0]
484        sp_kits.target_name = self.config.bundle_name
485        param_config = self.config.get(ConfigConst.kits_params).get(
486            CKit.smartperf, "")
487        sp_kits.__check_config__(param_config)
488        self.kits.insert(0, sp_kits)
489
490    def _handle_logs(self, request):
491        serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns())
492        log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
493        if hasattr(self.config, ConfigConst.device_log) and \
494                self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
495                and hasattr(self.config.device, "start_get_crash_log"):
496            self.config.device.device_log_collector.\
497                start_get_crash_log(log_tar_file_name, module_name=request.get_module_name())
498        self.config.device.device_log_collector.\
499            remove_log_address(self.device_log, self.hilog)
500        self.config.device.device_log_collector.\
501            stop_catch_device_log(self.log_proc)
502        self.config.device.device_log_collector.\
503            stop_catch_device_log(self.hilog_proc)
504
505    def __result__(self):
506        return self.result if os.path.exists(self.result) else ""
507
508