• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import time
22from dataclasses import dataclass
23
24from xdevice import DeviceTestType
25from xdevice import IDriver
26from xdevice import Plugin
27from xdevice import platform_logger
28from xdevice import DeviceLabelType
29from xdevice import ShellHandler
30from xdevice import ExecuteTerminate
31from xdevice import get_plugin
32from xdevice import JsonParser
33from xdevice import get_config_value
34from xdevice import get_kit_instances
35from xdevice import check_result_report
36from xdevice import get_device_log_file
37from xdevice import get_test_component_version
38from xdevice import ParamError
39from ohos.constants import ParserType
40from ohos.constants import ComType
41from ohos.constants import CKit
42from ohos.exception import LiteDeviceExecuteCommandError
43from core.utils import get_filename_extension
44from core.testkit.kit_lite import DeployKit
45from core.config.resource_manager import ResourceManager
46from core.config.config_manager import UserConfigManager
47
48__all__ = ["LiteUnitTest", "CTestDriver", "JSUnitTestLiteDriver"]
49LOG = platform_logger("LiteUnitTest")
50
51
52def get_level_para_string(level_string):
53    level_list = list(set(level_string.split(",")))
54    level_para_string = ""
55    for item in level_list:
56        if not item.isdigit():
57            continue
58        item = item.strip(" ")
59        level_para_string = f"{level_para_string}Level{item},"
60    level_para_string = level_para_string.strip(",")
61    return level_para_string
62
63
64@dataclass
65class GTestConst(object):
66    exec_para_filter = "--gtest_filter"
67    exec_para_level = "--gtest_testsize"
68
69
70@Plugin(type=Plugin.DRIVER, id=DeviceTestType.lite_cpp_test)
71class LiteUnitTest(IDriver):
72    """
73    lite gtest test driver for L1
74    """
75    config = None
76    log = platform_logger("LiteUnitTest")
77    nfs_dir = ""
78    mnt_cmd = ""
79    lite_device = None
80    result = None
81
82    def __check_failed__(self, msg):
83        self.log.error("check failed {}".format(msg))
84        return
85
86    def __check_environment__(self, device_options):
87        pass
88
89    def __check_config__(self, config):
90        """
91        1. check serial protocol
92        2. login device
93        3. NFS is available
94        :param config: serial device
95        :return:
96        """
97        self.log.error("Lite driver check config:{}".format(config))
98
99    def __execute__(self, request):
100        """
101
102        1. select test case by subsystem, module, suite
103        2. open test dir
104        3、execute single test case, eg. ./test_demo
105        :param request: contains test condition, sub_system
106            module_name,test_suit,
107        test_case,test_level,test_case_dir
108        :return:
109        """
110        self.log.debug("Test suite FilePath: %s" %
111                      request.root.source.source_file)
112        self.lite_device = request.config.environment.devices[0]
113        self.lite_device.connect()
114        if not self._before_execute_test():
115            self.log.error("open test dir failed")
116            return
117        self.log.debug("open test dir success")
118        if self._execute_test(request) == "":
119            self.log.error("execute test command failed")
120            return
121        self.log.info("execute test command success")
122        if not self._after_execute_test(request):
123            self.log.error("after execute test failed")
124            return
125        self.log.info("lite device execute request success")
126
127    def _mount_nfs_server(self):
128        #before execute each suits bin, mount nfs
129        self.mnt_cmd = "mount {}".format(UserConfigManager().get_user_config(
130            "NFS").get("mnt_cmd"))
131        if self.mnt_cmd == "mount ":
132            self.log.error("no configure for mount command")
133            return
134
135        filter_result, status, _ = \
136            self.lite_device.execute_command_with_timeout(
137            self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, timeout=3)
138        if "already mounted" in filter_result:
139            self.log.info("nfs has been mounted")
140            return
141
142        for i in range(0, 2):
143            if status:
144                self.log.info("execute mount command success")
145                return
146            self.log.info("try mount %d" % (i + 2))
147            _, status, _ = self.lite_device.execute_command_with_timeout(
148                self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test,
149                timeout=3)
150
151        self.log.error("execute mount command failed")
152
153    def _before_execute_test(self):
154        """
155        need copy test case to nfs dir
156        :param request: nfs dir, test case path
157        :return:
158        """
159        self.nfs_dir = \
160            UserConfigManager().get_user_config("NFS").get("host_dir")
161        if self.nfs_dir == "":
162            self.log.error("no configure for nfs directory")
163            return False
164        self._mount_nfs_server()
165        _, status, _ = \
166            self.lite_device.execute_command_with_timeout("cd /{}".format(
167                UserConfigManager().get_user_config("NFS").get("board_dir")),
168            case_type=DeviceTestType.lite_cpp_test)
169        if not status:
170            self.log.error("pre execute command failed")
171            return False
172        self.log.info("pre execute command success")
173        return True
174
175    def _execute_test(self, request):
176        test_case = request.root.source.source_file
177        self.config = request.config
178        test_para = self._get_test_para(self.config.testcase,
179                                       self.config.testlevel)
180        case_name = os.path.basename(test_case)
181        if os.path.exists(os.path.join(self.nfs_dir, case_name)):
182            os.remove(os.path.join(self.nfs_dir, case_name))
183        result_name = case_name + ".xml"
184        result_file = os.path.join(self.nfs_dir, result_name)
185        if os.path.exists(result_file):
186            os.remove(result_file)
187        shutil.copyfile(test_case, os.path.join(self.nfs_dir, case_name))
188        # push resource files
189        resource_manager = ResourceManager()
190        resource_data_dic, resource_dir = \
191            resource_manager.get_resource_data_dic(test_case)
192        resource_manager.lite_process_preparer_data(resource_data_dic, resource_dir)
193        self.lite_device.execute_command_with_timeout(
194            "chmod 777 {}".format(case_name),
195            case_type=DeviceTestType.lite_cpp_test)
196        test_command = "./%s %s" % (case_name, test_para)
197        case_result, status, _ = \
198            self.lite_device.execute_command_with_timeout(
199            test_command, case_type=DeviceTestType.lite_cpp_test)
200        if status:
201            self.log.info("test case result:\n %s" % case_result)
202            return
203        self.log.error("failed case: %s" % test_case)
204
205    def _get_test_para(self, testcase, testlevel):
206        if "" != testcase and "" == testlevel:
207            test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
208        elif "" == testcase and "" != testlevel:
209            level_para = get_level_para_string(testlevel)
210            test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
211        else:
212            test_para = ""
213        return test_para
214
215    def _after_execute_test(self, request):
216        """
217        copy test result to result dir
218        :param request:
219        :return:
220        """
221        if request.config is None:
222            self.log.error("test config is null")
223            return False
224        report_path = request.config.report_path
225        test_result = os.path.join(report_path, "result")
226        test_case = request.root.source.source_file
227        case_name = os.path.basename(test_case)
228        if not os.path.exists(test_result):
229            os.mkdir(test_result)
230        sub_system_module = test_case.split(
231            "unittest" + os.sep)[1].split(os.sep + "bin")[0]
232        if os.sep in sub_system_module:
233            sub_system = sub_system_module.split(os.sep)[0]
234            module_name = sub_system_module.split(os.sep)[1]
235            subsystem_dir = os.path.join(test_result, sub_system)
236            if not os.path.exists(subsystem_dir):
237                os.mkdir(subsystem_dir)
238            module_dir = os.path.join(subsystem_dir, module_name)
239            if not os.path.exists(module_dir):
240                os.mkdir(module_dir)
241            test_result = module_dir
242        else:
243            if sub_system_module != "":
244                test_result = os.path.join(test_result, sub_system_module)
245                if not os.path.exists(test_result):
246                    os.mkdir(test_result)
247        result_name = case_name + ".xml"
248        result_file = os.path.join(self.nfs_dir, result_name)
249        if not self._check_xml_exist(result_name):
250            self.log.error("result xml file %s not exist." % result_name)
251        if not os.path.exists(result_file):
252            self.log.error("file %s not exist." % result_file)
253            self._clear_nfs_space()
254            return False
255        file_name = os.path.basename(result_file)
256        final_result = os.path.join(test_result, file_name)
257        shutil.copyfile(result_file,
258                        final_result)
259        self.log.info("after execute test")
260        self._clear_nfs_space()
261        self.lite_device.close()
262        return True
263
264    def _clear_nfs_space(self):
265        _, status, _ = \
266            self.lite_device.execute_command_with_timeout(
267                "cd ..",
268                case_type=DeviceTestType.lite_cpp_test)
269        _, status, _ = \
270            self.lite_device.execute_command_with_timeout(
271                "umount %s" % UserConfigManager().get_user_config("NFS").get("board_dir"),
272                case_type=DeviceTestType.lite_cpp_test)
273        shutil.rmtree(self.nfs_dir)
274        os.mkdir(self.nfs_dir)
275
276    def _check_xml_exist(self, xml_file, timeout=10):
277        ls_command = \
278            "ls /%s" % \
279            UserConfigManager().get_user_config("NFS").get("board_dir")
280        start_time = time.time()
281        while time.time() - start_time < timeout:
282            result, _, _ = self.lite_device.execute_command_with_timeout(
283                command=ls_command, case_type=DeviceTestType.cpp_test_lite,
284                timeout=5, receiver=None)
285            if xml_file in result:
286                return True
287            time.sleep(5)
288        return False
289
290    def show_help_info(self):
291        """
292        show help info.
293        """
294        self.log.info("this is test driver for cpp test")
295        return
296
297    def show_driver_info(self):
298        """
299        show driver info.
300        """
301        self.log.info("this is test driver for cpp test")
302        return
303
304    def __result__(self):
305        pass
306
307
308@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite)
309class CTestDriver(IDriver):
310    """
311    CTest is a test that runs a native test package on given lite device.
312    """
313    config = None
314    result = ""
315    error_message = ""
316    version_cmd = "AT+CSV"
317
318    def __init__(self):
319        self.file_name = ""
320
321    def __check_environment__(self, device_options):
322        if len(device_options) != 1 or \
323                device_options[0].label != DeviceLabelType.wifiiot:
324            self.error_message = "check environment failed"
325            return False
326        return True
327
328    def __check_config__(self, config=None):
329        del config
330        self.config = None
331
332    def __execute__(self, request):
333        from xdevice import Variables
334        try:
335            self.config = request.config
336            self.config.device = request.config.environment.devices[0]
337
338            if request.config.resource_path:
339                current_dir = request.config.resource_path
340            else:
341                current_dir = Variables.exec_dir
342
343            config_file = request.root.source.config_file.strip()
344            if config_file:
345                source = os.path.join(current_dir, config_file)
346                self.file_name = os.path.basename(config_file).split(".")[0]
347            else:
348                source = request.root.source.source_string.strip()
349
350            self._run_ctest(source=source, request=request)
351
352        except (LiteDeviceExecuteCommandError, Exception) as exception:
353            LOG.error(exception, error_no=getattr(exception, "error_no",
354                                                  "00000"))
355            self.error_message = exception
356        finally:
357            if request.root.source.test_name.startswith("{"):
358                report_name = "report"
359            else:
360                report_name = get_filename_extension(
361                    request.root.source.test_name)[0]
362
363            self.result = check_result_report(request.config.report_path,
364                                              self.result,
365                                              self.error_message,
366                                              report_name)
367
368    def _run_ctest(self, source=None, request=None):
369        if not source:
370            LOG.error("Error: %s don't exist." % source, error_no="00101")
371            return
372
373        try:
374            parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
375            version = get_test_component_version(self.config)
376            parser_instances = []
377            for parser in parsers:
378                parser_instance = parser.__class__()
379                parser_instance.suites_name = self.file_name
380                parser_instance.product_info.setdefault("Version", version)
381                parser_instance.listeners = request.listeners
382                parser_instances.append(parser_instance)
383            handler = ShellHandler(parser_instances)
384
385            reset_cmd = self._reset_device(request, source)
386            self.result = "%s.xml" % os.path.join(request.config.report_path,
387                                                  "result", self.file_name)
388
389            self.config.device.device.com_dict.get(
390                ComType.deploy_com).connect()
391
392            result, _, error = self.config.device.device. \
393                execute_command_with_timeout(
394                    command=reset_cmd,
395                    case_type=DeviceTestType.ctest_lite,
396                    key=ComType.deploy_com,
397                    timeout=90,
398                    receiver=handler)
399
400            device_log_file = get_device_log_file(request.config.report_path,
401                                                  request.config.device.
402                                                  __get_serial__())
403
404            device_log_file_open = os.open(device_log_file, os.O_WRONLY |
405                os.O_CREAT | os.O_APPEND, 0o755)
406            with os.fdopen(device_log_file_open, "a") as file_name:
407                file_name.write("{}{}".format(
408                    "\n".join(result.split("\n")[0:-1]), "\n"))
409                file_name.flush()
410        finally:
411            self.config.device.device.com_dict.get(ComType.deploy_com).close()
412
413    def _reset_device(self, request, source):
414        json_config = JsonParser(source)
415        reset_cmd = []
416        kit_instances = get_kit_instances(json_config,
417                                          request.config.resource_path,
418                                          request.config.testcases_path)
419        from xdevice import Scheduler
420
421        for (kit_instance, kit_info) in zip(kit_instances,
422                                            json_config.get_kits()):
423            if not isinstance(kit_instance, DeployKit):
424                continue
425            if not self.file_name:
426                self.file_name = get_config_value(
427                    'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
428            reset_cmd = kit_instance.burn_command
429            if not Scheduler.is_execute:
430                raise ExecuteTerminate("ExecuteTerminate", error_no="00300")
431
432            kit_instance.__setup__(self.config.device,
433                source_file=request.root.source.source_file.strip())
434
435        reset_cmd = [int(item, 16) for item in reset_cmd]
436        return reset_cmd
437
438    def __result__(self):
439        return self.result if os.path.exists(self.result) else ""
440
441
442
443@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite)
444class JSUnitTestLiteDriver(IDriver):
445    """
446    JSUnitTestDriver is a Test that runs a native test package on given device.
447    """
448
449    def __init__(self):
450        self.result = ""
451        self.error_message = ""
452        self.kits = []
453        self.config = None
454
455    def __check_environment__(self, device_options):
456        pass
457
458    def __check_config__(self, config):
459        pass
460
461    def _get_driver_config(self, json_config):
462        bundle_name = get_config_value('bundle-name',
463                                       json_config.get_driver(), False)
464        if not bundle_name:
465            raise ParamError("Can't find bundle-name in config file.",
466                             error_no="00108")
467        else:
468            self.config.bundle_name = bundle_name
469
470        ability = get_config_value('ability',
471                                   json_config.get_driver(), False)
472        if not ability:
473            self.config.ability = "default"
474        else:
475            self.config.ability = ability
476
477    def __execute__(self, request):
478        try:
479            LOG.debug("Start execute xdevice extension JSUnit Test")
480
481            self.config = request.config
482            self.config.device = request.config.environment.devices[0]
483
484            config_file = request.root.source.config_file
485            suite_file = request.root.source.source_file
486
487            if not suite_file:
488                raise ParamError(
489                    "test source '%s' not exists" %
490                    request.root.source.source_string, error_no="00101")
491
492            if not os.path.exists(config_file):
493                LOG.error("Error: Test cases don't exist %s." % config_file,
494                          error_no="00101")
495                raise ParamError(
496                    "Error: Test cases don't exist %s." % config_file,
497                    error_no="00101")
498
499            self.file_name = os.path.basename(
500                request.root.source.source_file.strip()).split(".")[0]
501
502            self.result = "%s.xml" % os.path.join(
503                request.config.report_path, "result", self.file_name)
504
505            json_config = JsonParser(config_file)
506            self.kits = get_kit_instances(json_config,
507                                          self.config.resource_path,
508                                          self.config.testcases_path)
509
510            self._get_driver_config(json_config)
511
512            from xdevice import Scheduler
513            for kit in self.kits:
514                if not Scheduler.is_execute:
515                    raise ExecuteTerminate("ExecuteTerminate",
516                                           error_no="00300")
517                if kit.__class__.__name__ == CKit.liteinstall:
518                    kit.bundle_name = self.config.bundle_name
519                kit.__setup__(self.config.device, request=request)
520
521            self._run_jsunit(request)
522
523        except Exception as exception:
524            self.error_message = exception
525        finally:
526            report_name = "report" if request.root.source. \
527                test_name.startswith("{") else get_filename_extension(
528                request.root.source.test_name)[0]
529
530            self.result = check_result_report(
531                request.config.report_path, self.result, self.error_message,
532                report_name)
533
534            for kit in self.kits:
535                kit.__teardown__(self.config.device)
536
537            self.config.device.close()
538
539    def _run_jsunit(self, request):
540        parser_instances = []
541        parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite)
542        for parser in parsers:
543            parser_instance = parser.__class__()
544            parser_instance.suites_name = self.file_name
545            parser_instance.listeners = request.listeners
546            parser_instances.append(parser_instance)
547        handler = ShellHandler(parser_instances)
548
549        command = "./bin/aa start -p %s -n %s" % \
550                  (self.config.bundle_name, self.config.ability)
551        result, _, error = self.config.device.execute_command_with_timeout(
552            command=command,
553            timeout=300,
554            receiver=handler)
555
556        device_log_file = get_device_log_file(request.config.report_path,
557            request.config.device.
558            __get_serial__())
559
560        device_log_file_open = os.open(device_log_file, os.O_WRONLY |
561            os.O_CREAT | os.O_APPEND, 0o755)
562        with os.fdopen(device_log_file_open, "a") as file_name:
563            file_name.write("{}{}".format(
564                "\n".join(result.split("\n")[0:-1]), "\n"))
565            file_name.flush()
566
567    def __result__(self):
568        return self.result if os.path.exists(self.result) else ""