• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import time
22from dataclasses import dataclass
23
24from xdevice import DeviceTestType
25from xdevice import IDriver
26from xdevice import Plugin
27from xdevice import platform_logger
28from xdevice import DeviceLabelType
29from xdevice import ComType
30from xdevice import ParserType
31from xdevice import ShellHandler
32from xdevice import ExecuteTerminate
33from xdevice import LiteDeviceExecuteCommandError
34from xdevice import get_plugin
35from xdevice import JsonParser
36from xdevice import get_config_value
37from xdevice import get_kit_instances
38from xdevice import check_result_report
39from xdevice import get_device_log_file
40from xdevice import get_test_component_version
41from xdevice import ParamError
42from core.utils import get_filename_extension
43from core.testkit.kit_lite import DeployKit
44from core.config.resource_manager import ResourceManager
45from core.config.config_manager import UserConfigManager
46
47__all__ = ["LiteUnitTest", "CTestDriver", "JSUnitTestLiteDriver"]
48LOG = platform_logger("LiteUnitTest")
49
50
51def get_level_para_string(level_string):
52    level_list = list(set(level_string.split(",")))
53    level_para_string = ""
54    for item in level_list:
55        if not item.isdigit():
56            continue
57        item = item.strip(" ")
58        level_para_string += ("Level%s," % item)
59    level_para_string = level_para_string.strip(",")
60    return level_para_string
61
62
63@dataclass
64class GTestConst(object):
65    exec_para_filter = "--gtest_filter"
66    exec_para_level = "--gtest_testsize"
67
68
69@Plugin(type=Plugin.DRIVER, id=DeviceTestType.lite_cpp_test)
70class LiteUnitTest(IDriver):
71    """
72    lite gtest test driver for L1
73    """
74    config = None
75    log = platform_logger("LiteUnitTest")
76    nfs_dir = ""
77    mnt_cmd = ""
78    lite_device = None
79    result = None
80
81    def __check_failed__(self, msg):
82        self.log.error("check failed {}".format(msg))
83        return None
84
85    def __check_environment__(self, device_options):
86        pass
87
88    def __check_config__(self, config):
89        """
90        1. check serial protocol
91        2. login device
92        3. NFS is available
93        :param config: serial device
94        :return:
95        """
96        self.log.error("Lite driver check config:{}".format(config))
97
98    def __execute__(self, request):
99        """
100
101        1. select test case by subsystem, module, suite
102        2. open test dir
103        3、execute single test case, eg. ./test_demo
104        :param request: contains test condition, sub_system
105            module_name,test_suit,
106        test_case,test_level,test_case_dir
107        :return:
108        """
109        self.log.debug("Test suite FilePath: %s" %
110                      request.root.source.source_file)
111        self.lite_device = request.config.environment.devices[0]
112        self.lite_device.connect()
113        if not self._before_execute_test():
114            self.log.error("open test dir failed")
115            return
116        self.log.debug("open test dir success")
117        if self._execute_test(request) == "":
118            self.log.error("execute test command failed")
119            return
120        self.log.info("execute test command success")
121        if not self._after_execute_test(request):
122            self.log.error("after execute test failed")
123            return
124        self.log.info("lite device execute request success")
125
126    def _mount_nfs_server(self):
127        #before execute each suits bin, mount nfs
128        self.mnt_cmd = "mount {}".format(UserConfigManager().get_user_config(
129            "NFS").get("mnt_cmd"))
130        if self.mnt_cmd == "mount ":
131            self.log.error("no configure for mount command")
132            return
133
134        filter_result, status, _ = \
135            self.lite_device.execute_command_with_timeout(
136            self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, timeout=3)
137        if "already mounted" in filter_result:
138            self.log.info("nfs has been mounted")
139            return
140
141        for i in range(0, 2):
142            if status:
143                self.log.info("execute mount command success")
144                return
145            self.log.info("try mount %d" % (i + 2))
146            _, status, _ = self.lite_device.execute_command_with_timeout(
147                self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test,
148                timeout=3)
149
150        self.log.error("execute mount command failed")
151
152    def _before_execute_test(self):
153        """
154        need copy test case to nfs dir
155        :param request: nfs dir, test case path
156        :return:
157        """
158        self.nfs_dir = \
159            UserConfigManager().get_user_config("NFS").get("host_dir")
160        if self.nfs_dir == "":
161            self.log.error("no configure for nfs directory")
162            return False
163        self._mount_nfs_server()
164        _, status, _ = \
165            self.lite_device.execute_command_with_timeout("cd /{}".format(
166                UserConfigManager().get_user_config("NFS").get("board_dir")),
167            case_type=DeviceTestType.lite_cpp_test)
168        if not status:
169            self.log.error("pre execute command failed")
170            return False
171        self.log.info("pre execute command success")
172        return True
173
174    def _execute_test(self, request):
175        test_case = request.root.source.source_file
176        self.config = request.config
177        test_para = self._get_test_para(self.config.testcase,
178                                       self.config.testlevel)
179        case_name = os.path.basename(test_case)
180        if os.path.exists(os.path.join(self.nfs_dir, case_name)):
181            os.remove(os.path.join(self.nfs_dir, case_name))
182        result_name = case_name + ".xml"
183        result_file = os.path.join(self.nfs_dir, result_name)
184        if os.path.exists(result_file):
185            os.remove(result_file)
186        shutil.copyfile(test_case, os.path.join(self.nfs_dir, case_name))
187        # push resource files
188        resource_manager = ResourceManager()
189        resource_data_dic, resource_dir = \
190            resource_manager.get_resource_data_dic(test_case)
191        resource_manager.lite_process_preparer_data(resource_data_dic, resource_dir)
192        self.lite_device.execute_command_with_timeout(
193            "chmod 777 {}".format(case_name),
194            case_type=DeviceTestType.lite_cpp_test)
195        test_command = "./%s %s" % (case_name, test_para)
196        case_result, status, _ = \
197            self.lite_device.execute_command_with_timeout(
198            test_command, case_type=DeviceTestType.lite_cpp_test)
199        if status:
200            self.log.info("test case result:\n %s" % case_result)
201            return
202        self.log.error("failed case: %s" % test_case)
203
204    def _get_test_para(self, testcase, testlevel):
205        if "" != testcase and "" == testlevel:
206            test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
207        elif "" == testcase and "" != testlevel:
208            level_para = get_level_para_string(testlevel)
209            test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
210        else:
211            test_para = ""
212        return test_para
213
214    def _after_execute_test(self, request):
215        """
216        copy test result to result dir
217        :param request:
218        :return:
219        """
220        if request.config is None:
221            self.log.error("test config is null")
222            return False
223        report_path = request.config.report_path
224        test_result = os.path.join(report_path, "result")
225        test_case = request.root.source.source_file
226        case_name = os.path.basename(test_case)
227        if not os.path.exists(test_result):
228            os.mkdir(test_result)
229        sub_system_module = test_case.split(
230            "unittest" + os.sep)[1].split(os.sep + "bin")[0]
231        if os.sep in sub_system_module:
232            sub_system = sub_system_module.split(os.sep)[0]
233            module_name = sub_system_module.split(os.sep)[1]
234            subsystem_dir = os.path.join(test_result, sub_system)
235            if not os.path.exists(subsystem_dir):
236                os.mkdir(subsystem_dir)
237            module_dir = os.path.join(subsystem_dir, module_name)
238            if not os.path.exists(module_dir):
239                os.mkdir(module_dir)
240            test_result = module_dir
241        else:
242            if sub_system_module != "":
243                test_result = os.path.join(test_result, sub_system_module)
244                if not os.path.exists(test_result):
245                    os.mkdir(test_result)
246        result_name = case_name + ".xml"
247        result_file = os.path.join(self.nfs_dir, result_name)
248        if not self._check_xml_exist(result_name):
249            self.log.error("result xml file %s not exist." % result_name)
250        if not os.path.exists(result_file):
251            self.log.error("file %s not exist." % result_file)
252            self._clear_nfs_space()
253            return False
254        file_name = os.path.basename(result_file)
255        final_result = os.path.join(test_result, file_name)
256        shutil.copyfile(result_file,
257                        final_result)
258        self.log.info("after execute test")
259        self._clear_nfs_space()
260        self.lite_device.close()
261        return True
262
263    def _clear_nfs_space(self):
264        _, status, _ = \
265            self.lite_device.execute_command_with_timeout(
266                "cd ..",
267                case_type=DeviceTestType.lite_cpp_test)
268        _, status, _ = \
269            self.lite_device.execute_command_with_timeout(
270                "umount %s" % UserConfigManager().get_user_config("NFS").get("board_dir"),
271                case_type=DeviceTestType.lite_cpp_test)
272        shutil.rmtree(self.nfs_dir)
273        os.mkdir(self.nfs_dir)
274
275    def _check_xml_exist(self, xml_file, timeout=10):
276        ls_command = \
277            "ls /%s" % \
278            UserConfigManager().get_user_config("NFS").get("board_dir")
279        start_time = time.time()
280        while time.time()-start_time < timeout:
281            result, _, _ = self.lite_device.execute_command_with_timeout(
282                command=ls_command, case_type=DeviceTestType.cpp_test_lite,
283                timeout=5, receiver=None)
284            if xml_file in result:
285                return True
286            time.sleep(5)
287        return False
288
289    def show_help_info(self):
290        """
291        show help info.
292        """
293        self.log.info("this is test driver for cpp test")
294        return None
295
296    def show_driver_info(self):
297        """
298        show driver info.
299        """
300        self.log.info("this is test driver for cpp test")
301        return None
302
303    def __result__(self):
304        pass
305
306
307@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite)
308class CTestDriver(IDriver):
309    """
310    CTest is a test that runs a native test package on given lite device.
311    """
312    config = None
313    result = ""
314    error_message = ""
315    version_cmd = "AT+CSV"
316
317    def __init__(self):
318        self.file_name = ""
319
320    def __check_environment__(self, device_options):
321        if len(device_options) != 1 or \
322                device_options[0].label != DeviceLabelType.wifiiot:
323            self.error_message = "check environment failed"
324            return False
325        return True
326
327    def __check_config__(self, config=None):
328        del config
329        self.config = None
330
331    def __execute__(self, request):
332        from xdevice import Variables
333        try:
334            self.config = request.config
335            self.config.device = request.config.environment.devices[0]
336
337            if request.config.resource_path:
338                current_dir = request.config.resource_path
339            else:
340                current_dir = Variables.exec_dir
341
342            config_file = request.root.source.config_file.strip()
343            if config_file:
344                source = os.path.join(current_dir, config_file)
345                self.file_name = os.path.basename(config_file).split(".")[0]
346            else:
347                source = request.root.source.source_string.strip()
348
349            self._run_ctest(source=source, request=request)
350
351        except (LiteDeviceExecuteCommandError, Exception) as exception:
352            LOG.error(exception, error_no=getattr(exception, "error_no",
353                                                  "00000"))
354            self.error_message = exception
355        finally:
356            if request.root.source.test_name.startswith("{"):
357                report_name = "report"
358            else:
359                report_name = get_filename_extension(
360                    request.root.source.test_name)[0]
361
362            self.result = check_result_report(request.config.report_path,
363                                              self.result,
364                                              self.error_message,
365                                              report_name)
366
367    def _run_ctest(self, source=None, request=None):
368        if not source:
369            LOG.error("Error: %s don't exist." % source, error_no="00101")
370            return
371
372        try:
373            parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
374            version = get_test_component_version(self.config)
375            parser_instances = []
376            for parser in parsers:
377                parser_instance = parser.__class__()
378                parser_instance.suites_name = self.file_name
379                parser_instance.product_info.setdefault("Version", version)
380                parser_instance.listeners = request.listeners
381                parser_instances.append(parser_instance)
382            handler = ShellHandler(parser_instances)
383
384            reset_cmd = self._reset_device(request, source)
385            self.result = "%s.xml" % os.path.join(request.config.report_path,
386                                                  "result", self.file_name)
387
388            self.config.device.device.com_dict.get(
389                ComType.deploy_com).connect()
390
391            result, _, error = self.config.device.device. \
392                execute_command_with_timeout(
393                    command=reset_cmd,
394                    case_type=DeviceTestType.ctest_lite,
395                    key=ComType.deploy_com,
396                    timeout=90,
397                    receiver=handler)
398
399            device_log_file = get_device_log_file(request.config.report_path,
400                                                  request.config.device.
401                                                  __get_serial__())
402
403            device_log_file_open = os.open(device_log_file, os.O_WRONLY |
404                os.O_CREAT | os.O_APPEND, 0o755)
405            with os.fdopen(device_log_file_open, "a") as file_name:
406                file_name.write("{}{}".format(
407                    "\n".join(result.split("\n")[0:-1]), "\n"))
408                file_name.flush()
409        finally:
410            self.config.device.device.com_dict.get(ComType.deploy_com).close()
411
412    def _reset_device(self, request, source):
413        json_config = JsonParser(source)
414        reset_cmd = []
415        kit_instances = get_kit_instances(json_config,
416                                          request.config.resource_path,
417                                          request.config.testcases_path)
418        from xdevice import Scheduler
419
420        for (kit_instance, kit_info) in zip(kit_instances,
421                                            json_config.get_kits()):
422            if not isinstance(kit_instance, DeployKit):
423                continue
424            if not self.file_name:
425                self.file_name = get_config_value(
426                    'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
427            reset_cmd = kit_instance.burn_command
428            if not Scheduler.is_execute:
429                raise ExecuteTerminate("ExecuteTerminate", error_no="00300")
430
431            kit_instance.__setup__(self.config.device,
432                source_file=request.root.source.source_file.strip())
433
434        reset_cmd = [int(item, 16) for item in reset_cmd]
435        return reset_cmd
436
437    def __result__(self):
438        return self.result if os.path.exists(self.result) else ""
439
440
441
442@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite)
443class JSUnitTestLiteDriver(IDriver):
444    """
445    JSUnitTestDriver is a Test that runs a native test package on given device.
446    """
447
448    def __init__(self):
449        self.result = ""
450        self.error_message = ""
451        self.kits = []
452        self.config = None
453
454    def __check_environment__(self, device_options):
455        pass
456
457    def __check_config__(self, config):
458        pass
459
460    def _get_driver_config(self, json_config):
461        bundle_name = get_config_value('bundle-name',
462                                       json_config.get_driver(), False)
463        if not bundle_name:
464            raise ParamError("Can't find bundle-name in config file.",
465                             error_no="00108")
466        else:
467            self.config.bundle_name = bundle_name
468
469        ability = get_config_value('ability',
470                                   json_config.get_driver(), False)
471        if not ability:
472            self.config.ability = "default"
473        else:
474            self.config.ability = ability
475
476    def __execute__(self, request):
477        try:
478            LOG.debug("Start execute xdevice extension JSUnit Test")
479
480            self.config = request.config
481            self.config.device = request.config.environment.devices[0]
482
483            config_file = request.root.source.config_file
484            suite_file = request.root.source.source_file
485
486            if not suite_file:
487                raise ParamError(
488                    "test source '%s' not exists" %
489                    request.root.source.source_string, error_no="00101")
490
491            if not os.path.exists(config_file):
492                LOG.error("Error: Test cases don't exist %s." % config_file,
493                          error_no="00101")
494                raise ParamError(
495                    "Error: Test cases don't exist %s." % config_file,
496                    error_no="00101")
497
498            self.file_name = os.path.basename(
499                request.root.source.source_file.strip()).split(".")[0]
500
501            self.result = "%s.xml" % os.path.join(
502                request.config.report_path, "result", self.file_name)
503
504            json_config = JsonParser(config_file)
505            self.kits = get_kit_instances(json_config,
506                                          self.config.resource_path,
507                                          self.config.testcases_path)
508
509            self._get_driver_config(json_config)
510
511            from xdevice import Scheduler
512            for kit in self.kits:
513                if not Scheduler.is_execute:
514                    raise ExecuteTerminate("ExecuteTerminate",
515                                           error_no="00300")
516                if kit.__class__.__name__ == CKit.liteinstall:
517                    kit.bundle_name = self.config.bundle_name
518                kit.__setup__(self.config.device, request=request)
519
520            self._run_jsunit(request)
521
522        except Exception as exception:
523            self.error_message = exception
524        finally:
525            report_name = "report" if request.root.source. \
526                test_name.startswith("{") else get_filename_extension(
527                request.root.source.test_name)[0]
528
529            self.result = check_result_report(
530                request.config.report_path, self.result, self.error_message,
531                report_name)
532
533            for kit in self.kits:
534                kit.__teardown__(self.config.device)
535
536            self.config.device.close()
537
538    def _run_jsunit(self, request):
539        parser_instances = []
540        parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite)
541        for parser in parsers:
542            parser_instance = parser.__class__()
543            parser_instance.suites_name = self.file_name
544            parser_instance.listeners = request.listeners
545            parser_instances.append(parser_instance)
546        handler = ShellHandler(parser_instances)
547
548        command = "./bin/aa start -p %s -n %s" % \
549                  (self.config.bundle_name, self.config.ability)
550        result, _, error = self.config.device.execute_command_with_timeout(
551            command=command,
552            timeout=300,
553            receiver=handler)
554
555        device_log_file = get_device_log_file(request.config.report_path,
556            request.config.device.
557            __get_serial__())
558
559        device_log_file_open = os.open(device_log_file, os.O_WRONLY |
560            os.O_CREAT | os.O_APPEND, 0o755)
561        with os.fdopen(device_log_file_open, "a") as file_name:
562            file_name.write("{}{}".format(
563                "\n".join(result.split("\n")[0:-1]), "\n"))
564            file_name.flush()
565
566    def __result__(self):
567        return self.result if os.path.exists(self.result) else ""