• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20
21from xdevice import ParamError
22from xdevice import IDriver
23from xdevice import platform_logger
24from xdevice import Plugin
25from xdevice import get_plugin
26from xdevice import JsonParser
27from xdevice import ShellHandler
28from xdevice import TestDescription
29from xdevice import get_device_log_file
30from xdevice import check_result_report
31from xdevice import get_kit_instances
32from xdevice import get_config_value
33from xdevice import do_module_kit_setup
34from xdevice import do_module_kit_teardown
35
36from xdevice_extension._core.constants import DeviceTestType
37from xdevice_extension._core.constants import CommonParserType
38from xdevice_extension._core.constants import FilePermission
39from xdevice_extension._core.executor.listener import CollectingPassListener
40from xdevice_extension._core.exception import ShellCommandUnresponsiveException
41from xdevice_extension._core.testkit.kit import oh_jsunit_para_parse
42
43__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver"]
44
45TIME_OUT = 300 * 1000
46
47LOG = platform_logger("OpenHarmony")
48
49
50@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test)
51class OHKernelTestDriver(IDriver):
52    """
53        OpenHarmonyKernelTest
54    """
55    def __init__(self):
56        self.timeout = 30 * 1000
57        self.result = ""
58        self.error_message = ""
59        self.kits = []
60        self.config = None
61        self.runner = None
62
63    def __check_environment__(self, device_options):
64        pass
65
66    def __check_config__(self, config):
67        pass
68
69    def __execute__(self, request):
70        try:
71            LOG.debug("Start to Execute OpenHarmony Kernel Test")
72
73            self.config = request.config
74            self.config.device = request.config.environment.devices[0]
75
76            config_file = request.root.source.config_file
77
78            self.result = "%s.xml" % \
79                          os.path.join(request.config.report_path,
80                                       "result", request.get_module_name())
81            hilog = get_device_log_file(
82                request.config.report_path,
83                request.config.device.__get_serial__(),
84                "device_hilog")
85
86            hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
87                                 FilePermission.mode_755)
88            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
89                self.config.device.start_catch_device_log(hilog_file_pipe)
90                self._run_oh_kernel(config_file, request.listeners, request)
91                hilog_file_pipe.flush()
92        except Exception as exception:
93            self.error_message = exception
94            if not getattr(exception, "error_no", ""):
95                setattr(exception, "error_no", "03409")
96            LOG.exception(self.error_message, exc_info=False, error_no="03409")
97            raise exception
98        finally:
99            do_module_kit_teardown(request)
100            self.config.device.stop_catch_device_log()
101            self.result = check_result_report(
102                request.config.report_path, self.result, self.error_message)
103
104    def _run_oh_kernel(self, config_file, listeners=None, request=None):
105        try:
106            json_config = JsonParser(config_file)
107            kits = get_kit_instances(json_config, self.config.resource_path,
108                                     self.config.testcases_path)
109            self._get_driver_config(json_config)
110            do_module_kit_setup(request, kits)
111            self.runner = OHKernelTestRunner(self.config)
112            self.runner.suite_name = request.get_module_name()
113            self.runner.run(listeners)
114        finally:
115            do_module_kit_teardown(request)
116
117    def _get_driver_config(self, json_config):
118        target_test_path = get_config_value('native-test-device-path',
119                                            json_config.get_driver(), False)
120        test_suite_name = get_config_value('test-suite-name',
121                                           json_config.get_driver(), False)
122        test_suites_list = get_config_value('test-suites-list',
123                                            json_config.get_driver(), False)
124        timeout_limit = get_config_value('timeout-limit',
125                                         json_config.get_driver(), False)
126        conf_file = get_config_value('conf-file',
127                                     json_config.get_driver(), False)
128        self.config.arg_list = {}
129        if target_test_path:
130            self.config.target_test_path = target_test_path
131        if test_suite_name:
132            self.config.arg_list["test-suite-name"] = test_suite_name
133        if test_suites_list:
134            self.config.arg_list["test-suites-list"] = test_suites_list
135        if timeout_limit:
136            self.config.arg_list["timeout-limit"] = timeout_limit
137        if conf_file:
138            self.config.arg_list["conf-file"] = conf_file
139        timeout_config = get_config_value('shell-timeout',
140                                          json_config.get_driver(), False)
141        if timeout_config:
142            self.config.timeout = int(timeout_config)
143        else:
144            self.config.timeout = TIME_OUT
145
146    def __result__(self):
147        return self.result if os.path.exists(self.result) else ""
148
149
150class OHKernelTestRunner:
151    def __init__(self, config):
152        self.suite_name = None
153        self.config = config
154        self.arg_list = config.arg_list
155
156    def run(self, listeners):
157        handler = self._get_shell_handler(listeners)
158        # hdc shell cd /data/local/tmp/OH_kernel_test;
159        # sh runtest test -t OpenHarmony_RK3568_config
160        # -n OpenHarmony_RK3568_skiptest -l 60
161        command = "cd %s; chmod +x *; sh runtest test %s" % (
162            self.config.target_test_path, self.get_args_command())
163        self.config.device.execute_shell_command(
164            command, timeout=self.config.timeout, receiver=handler, retry=0)
165
166    def _get_shell_handler(self, listeners):
167        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_kernel_test)
168        if parsers:
169            parsers = parsers[:1]
170        parser_instances = []
171        for parser in parsers:
172            parser_instance = parser.__class__()
173            parser_instance.suites_name = self.suite_name
174            parser_instance.listeners = listeners
175            parser_instances.append(parser_instance)
176        handler = ShellHandler(parser_instances)
177        return handler
178
179    def get_args_command(self):
180        args_commands = ""
181        for key, value in self.arg_list.items():
182            if key == "test-suite-name" or key == "test-suites-list":
183                args_commands = "%s -t %s" % (args_commands, value)
184            elif key == "conf-file":
185                args_commands = "%s -n %s" % (args_commands, value)
186            elif key == "timeout-limit":
187                args_commands = "%s -l %s" % (args_commands, value)
188        return args_commands
189
190
191@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test)
192class OHJSUnitTestDriver(IDriver):
193    """
194       OHJSUnitTestDriver is a Test that runs a native test package on
195       given device.
196    """
197
198    def __init__(self):
199        self.timeout = 80 * 1000
200        self.start_time = None
201        self.result = ""
202        self.error_message = ""
203        self.kits = []
204        self.config = None
205        self.runner = None
206        self.rerun = True
207        self.rerun_all = True
208
209    def __check_environment__(self, device_options):
210        pass
211
212    def __check_config__(self, config):
213        pass
214
215    def __execute__(self, request):
216        try:
217            LOG.debug("Start execute xdevice extension JSUnit Test")
218            self.result = os.path.join(
219                request.config.report_path, "result",
220                '.'.join((request.get_module_name(), "xml")))
221            self.config = request.config
222            self.config.device = request.config.environment.devices[0]
223
224            config_file = request.root.source.config_file
225            suite_file = request.root.source.source_file
226
227            if not suite_file:
228                raise ParamError(
229                    "test source '%s' not exists" %
230                    request.root.source.source_string, error_no="00110")
231            LOG.debug("Test case file path: %s" % suite_file)
232            hilog = get_device_log_file(request.config.report_path,
233                                        request.get_module_name(),
234                                        "device_hilog")
235
236            hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
237                                 0o755)
238            self.config.device.execute_shell_command(command="hilog -r")
239            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
240                self.config.device.start_catch_device_log(hilog_file_pipe)
241                self._run_oh_jsunit(config_file, request)
242        except Exception as exception:
243            self.error_message = exception
244            if not getattr(exception, "error_no", ""):
245                setattr(exception, "error_no", "03409")
246            LOG.exception(self.error_message, exc_info=True, error_no="03409")
247            raise exception
248        finally:
249            self.config.device.stop_catch_device_log()
250            self.result = check_result_report(
251                request.config.report_path, self.result, self.error_message)
252
253    def _run_oh_jsunit(self, config_file, request):
254        try:
255            if not os.path.exists(config_file):
256                LOG.error("Error: Test cases don't exist %s." % config_file)
257                raise ParamError(
258                    "Error: Test cases don't exist %s." % config_file,
259                    error_no="00102")
260            json_config = JsonParser(config_file)
261            self.kits = get_kit_instances(json_config,
262                                          self.config.resource_path,
263                                          self.config.testcases_path)
264
265            self._get_driver_config(json_config)
266            self.config.device.hdc_command("target mount")
267            do_module_kit_setup(request, self.kits)
268            self.runner = OHJSUnitTestRunner(self.config)
269            self.runner.suite_name = request.get_module_name()
270            # execute test case
271            self._get_runner_config(json_config)
272            oh_jsunit_para_parse(self.runner, self.config.testargs)
273            self._do_test_run(listener=request.listeners)
274
275        finally:
276            do_module_kit_teardown(request)
277
278    def _get_driver_config(self, json_config):
279        package = get_config_value('package-name',
280                                   json_config.get_driver(), False)
281        module = get_config_value('module-name',
282                                  json_config.get_driver(), False)
283        bundle = get_config_value('bundle-name',
284                                  json_config. get_driver(), False)
285
286        self.config.package_name = package
287        self.config.module_name = module
288        self.config.bundle_name = bundle
289
290        if not package and not module:
291            raise ParamError("Neither package nor moodle is found"
292                             " in config file.", error_no="03201")
293        timeout_config = get_config_value("shell-timeout",
294                                          json_config.get_driver(), False)
295        if timeout_config:
296            self.config.timeout = int(timeout_config)
297        else:
298            self.config.timeout = TIME_OUT
299
300    def _get_runner_config(self, json_config):
301        test_timeout = get_config_value('test-timeout',
302                                        json_config.get_driver(), False)
303        if test_timeout:
304            self.runner.add_arg("wait_time", int(test_timeout))
305
306        testcase_timeout = get_config_value('testcase-timeout',
307                                            json_config.get_driver(), False)
308        if testcase_timeout:
309            self.runner.add_arg("timeout", int(testcase_timeout))
310
311    def _do_test_run(self, listener):
312        test_to_run = self._collect_test_to_run()
313        LOG.info("Collected test count is: %s" % (len(test_to_run)
314                                                  if test_to_run else 0))
315        if not test_to_run:
316            self.runner.run(listener)
317        else:
318            self._run_with_rerun(listener, test_to_run)
319
320    def _collect_test_to_run(self):
321        if self.rerun:
322            run_results = self.runner.dry_run()
323            return run_results
324
325    def _run_tests(self, listener):
326        test_tracker = CollectingPassListener()
327        listener_copy = listener.copy()
328        listener_copy.append(test_tracker)
329        self.runner.run(listener_copy)
330        test_run = test_tracker.get_current_run_results()
331        return test_run
332
333    def _run_with_rerun(self, listener, expected_tests):
334        LOG.debug("Ready to run with rerun, expect run: %s"
335                  % len(expected_tests))
336        test_run = self._run_tests(listener)
337        LOG.debug("Run with rerun, has run: %s" % len(test_run)
338                  if test_run else 0)
339        if len(test_run) < len(expected_tests):
340            expected_tests = TestDescription.remove_test(expected_tests,
341                                                         test_run)
342            if not expected_tests:
343                LOG.debug("No tests to re-run, all tests executed at least "
344                          "once.")
345            if self.rerun_all:
346                self._rerun_all(expected_tests, listener)
347            else:
348                self._rerun_serially(expected_tests, listener)
349
350    def _rerun_all(self, expected_tests, listener):
351        tests = []
352        for test in expected_tests:
353            tests.append("%s#%s" % (test.class_name, test.test_name))
354        self.runner.add_arg("class", ",".join(tests))
355        LOG.debug("Ready to rerun all, expect run: %s" % len(expected_tests))
356        test_run = self._run_tests(listener)
357        LOG.debug("Rerun all, has run: %s" % len(test_run))
358        if len(test_run) < len(expected_tests):
359            expected_tests = TestDescription.remove_test(expected_tests,
360                                                         test_run)
361            if not expected_tests:
362                LOG.debug("Rerun textFile success")
363            self._rerun_serially(expected_tests, listener)
364
365    def _rerun_serially(self, expected_tests, listener):
366        LOG.debug("Rerun serially, expected run: %s" % len(expected_tests))
367        for test in expected_tests:
368            self.runner.add_arg(
369                "class", "%s#%s" % (test.class_name, test.test_name))
370            self.runner.rerun(listener, test)
371            self.runner.remove_arg("class")
372
373    def __result__(self):
374        return self.result if os.path.exists(self.result) else ""
375
376
377class OHJSUnitTestRunner:
378    def __init__(self, config):
379        self.arg_list = {}
380        self.suite_name = None
381        self.config = config
382        self.rerun_attemp = 3
383
384    def dry_run(self):
385        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list)
386        if parsers:
387            parsers = parsers[:1]
388        parser_instances = []
389        for parser in parsers:
390            parser_instance = parser.__class__()
391            parser_instances.append(parser_instance)
392        handler = ShellHandler(parser_instances)
393        command = self._get_dry_run_command()
394        self.config.device.execute_shell_command(
395            command, timeout=self.config.timeout, receiver=handler, retry=0)
396
397        return parser_instances[0].tests
398
399    def run(self, listener):
400        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
401        if parsers:
402            parsers = parsers[:1]
403        parser_instances = []
404        for parser in parsers:
405            parser_instance = parser.__class__()
406            parser_instance.suite_name = self.suite_name
407            parser_instance.listeners = listener
408            parser_instances.append(parser_instance)
409        handler = ShellHandler(parser_instances)
410        command = self._get_run_command()
411        self.config.device.execute_shell_command(
412            command, timeout=self.config.timeout, receiver=handler, retry=0)
413
414    def rerun(self, listener, test):
415        handler = None
416        if self.rerun_attemp:
417            test_tracker = CollectingPassListener()
418            try:
419
420                listener_copy = listener.copy()
421                listener_copy.append(test_tracker)
422                parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
423                if parsers:
424                    parsers = parsers[:1]
425                parser_instances = []
426                for parser in parsers:
427                    parser_instance = parser.__class__()
428                    parser_instance.suite_name = self.suite_name
429                    parser_instance.listeners = listener_copy
430                    parser_instances.append(parser_instance)
431                handler = ShellHandler(parser_instances)
432                command = self._get_run_command()
433                self.config.device.execute_shell_command(
434                    command, timeout=self.config.timeout, receiver=handler,
435                    retry=0)
436            except ShellCommandUnresponsiveException as _:
437                LOG.debug("Exception: ShellCommandUnresponsiveException")
438            finally:
439                if not len(test_tracker.get_current_run_results()):
440                    LOG.debug("No test case is obtained finally")
441                    self.rerun_attemp -= 1
442                    handler.parsers[0].mark_test_as_blocked(test)
443        else:
444            LOG.debug("Not execute and mark as blocked finally")
445            parsers = get_plugin(Plugin.PARSER, CommonParserType.cpptest)
446            if parsers:
447                parsers = parsers[:1]
448            parser_instances = []
449            for parser in parsers:
450                parser_instance = parser.__class__()
451                parser_instance.suite_name = self.suite_name
452                parser_instance.listeners = listener
453                parser_instances.append(parser_instance)
454            handler = ShellHandler(parser_instances)
455            handler.parsers[0].mark_test_as_blocked(test)
456
457    def add_arg(self, name, value):
458        if not name or not value:
459            return
460        self.arg_list[name] = value
461
462    def remove_arg(self, name):
463        if not name:
464            return
465        if name in self.arg_list:
466            del self.arg_list[name]
467
468    def get_args_command(self):
469        args_commands = ""
470        for key, value in self.arg_list.items():
471            if "wait_time" == key:
472                args_commands = "%s -w %s " % (args_commands, value)
473            else:
474                args_commands = "%s -s %s %s " % (args_commands, key, value)
475        return args_commands
476
477    def _get_run_command(self):
478        command = ""
479        if self.config.package_name:
480            # aa test -p ${packageName} -b ${bundleName}-s
481            # unittest OpenHarmonyTestRunner
482            command = "aa test -p %s -b %s -s unittest OpenHarmonyTestRunner" \
483                      " %s" % (self.config.package_name,
484                               self.config.bundle_name,
485                               self.get_args_command())
486        elif self.config.module_name:
487            #  aa test -m ${moduleName}  -b ${bundleName}
488            #  -s unittest OpenHarmonyTestRunner
489            command = "aa test -m %s -b %s -s unittest OpenHarmonyTestRunner" \
490                      " %s" % (self.config.module_name,
491                               self.config.bundle_name,
492                               self.get_args_command())
493        return command
494
495    def _get_dry_run_command(self):
496        command = ""
497        if self.config.package_name:
498            command = "aa test -p %s -b %s -s unittest OpenHarmonyTestRunner" \
499                      " %s -s dryRun true" % (self.config.package_name,
500                                              self.config.bundle_name,
501                                              self.get_args_command())
502        elif self.config.module_name:
503            command = "aa test -m %s -b %s -s unittest OpenHarmonyTestRunner" \
504                      " %s -s dryRun true" % (self.config.module_name,
505                                              self.config.bundle_name,
506                                              self.get_args_command())
507
508        return command
509
510