• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import glob
22import time
23import stat
24
25from _core.config.config_manager import UserConfigManager
26from _core.constants import ConfigConst
27from _core.constants import DeviceTestType
28from _core.constants import GTestConst
29from _core.constants import DeviceLabelType
30from _core.constants import ComType
31from _core.constants import ParserType
32from _core.constants import CKit
33from _core.constants import DeviceLiteKernel
34from _core.constants import FilePermission
35from _core.driver.parser_lite import ShellHandler
36from _core.environment.dmlib_lite import generate_report
37from _core.exception import ExecuteTerminate
38from _core.exception import LiteDeviceError
39from _core.exception import LiteDeviceExecuteCommandError
40from _core.exception import ParamError
41from _core.executor.listener import CollectingLiteGTestListener
42from _core.executor.listener import TestDescription
43from _core.interface import IDriver
44from _core.plugin import Plugin
45from _core.plugin import get_plugin
46from _core.logger import platform_logger
47from _core.report.reporter_helper import DataHelper
48from _core.testkit.json_parser import JsonParser
49from _core.testkit.kit_lite import DeployKit
50from _core.testkit.kit_lite import DeployToolKit
51from _core.utils import get_config_value
52from _core.utils import get_kit_instances
53from _core.utils import check_result_report
54from _core.utils import get_device_log_file
55from _core.utils import get_filename_extension
56from _core.utils import get_file_absolute_path
57from _core.utils import get_test_component_version
58from _core.report.suite_reporter import SuiteReporter
59
60__all__ = ["CppTestDriver", "CTestDriver", "init_remote_server"]
61LOG = platform_logger("DriversLite")
62FAILED_RUN_TEST_ATTEMPTS = 2
63CPP_TEST_MOUNT_STOP_SIGN = "not mount properly, Test Stop"
64CPP_TEST_NFS_SIGN = "execve: I/O error"
65
66
67def get_nfs_server(request):
68    config_manager = UserConfigManager(
69        config_file=request.get(ConfigConst.configfile, ""),
70        env=request.get(ConfigConst.test_environment, ""))
71    remote_info = config_manager.get_user_config("testcases/server",
72                                                 filter_name="NfsServer")
73    if not remote_info:
74        err_msg = "The name of remote nfs server does not match"
75        LOG.error(err_msg, error_no="00403")
76        raise ParamError(err_msg, error_no="00403")
77    return remote_info
78
79
80def init_remote_server(lite_instance, request=None):
81    config_manager = UserConfigManager(
82        config_file=request.get(ConfigConst.configfile, ""),
83        env=request.get(ConfigConst.test_environment, ""))
84    linux_dict = config_manager.get_user_config("testcases/server")
85
86    if linux_dict:
87        setattr(lite_instance, "linux_host", linux_dict.get("ip"))
88        setattr(lite_instance, "linux_port", linux_dict.get("port"))
89        setattr(lite_instance, "linux_directory", linux_dict.get("dir"))
90
91    else:
92        error_message = "nfs server does not exist, please " \
93                        "check user_config.xml"
94        raise ParamError(error_message, error_no="00108")
95
96
97def get_testcases(testcases_list):
98    cases_list = []
99    for test in testcases_list:
100        test_item = test.split("#")
101        if len(test_item) == 1:
102            cases_list.append(test)
103        elif len(test_item) == 2:
104            cases_list.append(test_item[-1])
105    return cases_list
106
107
108def sort_by_length(file_name):
109    return len(file_name)
110
111
112@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test_lite)
113class CppTestDriver(IDriver):
114    """
115    CppTest is a test that runs a native test package on given lite device.
116    """
117    config = None
118    result = ""
119    error_message = ""
120
121    def __init__(self):
122        self.rerun = True
123        self.file_name = ""
124
125    def __check_environment__(self, device_options):
126        if len(device_options) != 1 or \
127                device_options[0].label != DeviceLabelType.ipcamera:
128            self.error_message = "check environment failed"
129            return False
130        return True
131
132    def __check_config__(self, config=None):
133        pass
134
135    def __execute__(self, request):
136        kits = []
137        device_log_file = get_device_log_file(
138            request.config.report_path,
139            request.get_devices()[0].__get_serial__())
140        try:
141            self.config = request.config
142            self.init_cpp_config()
143            self.config.device = request.config.environment.devices[0]
144            init_remote_server(self, request=request)
145            config_file = request.root.source.config_file
146            json_config = JsonParser(config_file)
147            self._get_driver_config(json_config)
148
149            bin_file = get_config_value('execute', json_config.get_driver(),
150                                        False)
151            kits = get_kit_instances(json_config,
152                                     request.config.resource_path,
153                                     request.config.testcases_path)
154            from xdevice import Scheduler
155            for kit in kits:
156                if not Scheduler.is_execute:
157                    raise ExecuteTerminate("ExecuteTerminate",
158                                           error_no="00300")
159                kit.__setup__(request.config.device, request=request)
160
161            command = self._get_execute_command(bin_file)
162
163            self.set_file_name(request, command)
164
165            if self.config.xml_output:
166                self.delete_device_xml(request, self.config.device_xml_path)
167            if os.path.exists(self.result):
168                os.remove(self.result)
169            if request.config.testargs.get("dry_run"):
170                self.config.dry_run = request.config.testargs.get(
171                    "dry_run")[0].lower()
172                self.dry_run(command, request.listeners)
173            else:
174                self.run_cpp_test(command, request)
175                self.generate_device_xml(request, self.execute_bin)
176
177        except (LiteDeviceError, Exception) as exception:
178            LOG.exception(exception, exc_info=False)
179            self.error_message = exception
180        finally:
181            device_log_file_open = os.open(device_log_file, os.O_WRONLY |
182                                           os.O_CREAT | os.O_APPEND,
183                                           FilePermission.mode_755)
184            with os.fdopen(device_log_file_open, "a") as file_name:
185                file_name.write(self.config.command_result)
186                file_name.flush()
187            LOG.info("-------------finally-----------------")
188            self._after_command(kits, request)
189
190    def _get_execute_command(self, bin_file):
191        if self.config.device.get("device_kernel") == \
192                DeviceLiteKernel.linux_kernel:
193            execute_dir = "/storage" + "/".join(bin_file.split("/")[0:-1])
194        else:
195            execute_dir = "/".join(bin_file.split("/")[0:-1])
196        self.execute_bin = bin_file.split("/")[-1]
197
198        self.config.device.execute_command_with_timeout(
199            command="cd {}".format(execute_dir), timeout=1)
200        self.config.execute_bin_path = execute_dir
201
202        if self.execute_bin.startswith("/"):
203            command = ".%s" % self.execute_bin
204        else:
205            command = "./%s" % self.execute_bin
206
207        report_path = "/%s/%s/" % ("reports", self.execute_bin.split(".")[0])
208        self.config.device_xml_path = (self.linux_directory + report_path).\
209            replace("//", "/")
210        self.config.device_report_path = execute_dir + report_path
211
212        return command
213
214    def _get_driver_config(self, json_config):
215        xml_output = get_config_value('xml-output',
216                                      json_config.get_driver(), False)
217
218        if isinstance(xml_output, bool):
219            self.config.xml_output = xml_output
220        elif str(xml_output).lower() == "false":
221            self.config.xml_output = False
222        else:
223            self.config.xml_output = True
224
225        rerun = get_config_value('rerun', json_config.get_driver(), False)
226        if isinstance(rerun, bool):
227            self.rerun = rerun
228        elif str(rerun).lower() == "false":
229            self.rerun = False
230        else:
231            self.rerun = True
232
233        timeout_config = get_config_value('timeout',
234                                              json_config.get_driver(), False)
235        if timeout_config:
236            self.config.timeout = int(timeout_config)
237        else:
238            self.config.timeout = 900
239
240    def _after_command(self, kits, request):
241        if self.config.device.get("device_kernel") == \
242                DeviceLiteKernel.linux_kernel:
243            self.config.device.execute_command_with_timeout(
244                command="cd /storage", timeout=1)
245        else:
246            self.config.device.execute_command_with_timeout(
247                command="cd /", timeout=1)
248        for kit in kits:
249            kit.__teardown__(request.config.device)
250        self.config.device.close()
251        self.delete_device_xml(request, self.linux_directory)
252
253        report_name = "report" if request.root.source. \
254            test_name.startswith("{") else get_filename_extension(
255            request.root.source.test_name)[0]
256        if not self.config.dry_run:
257            self.result = check_result_report(
258                request.config.report_path, self.result, self.error_message,
259                report_name)
260
261    def generate_device_xml(self, request, execute_bin):
262        if self.config.xml_output:
263            self.download_nfs_xml(request, self.config.device_xml_path)
264            self.merge_xml(execute_bin)
265
266    def dry_run(self, request, command, listener=None):
267        if self.config.xml_output:
268            collect_test_command = "%s --gtest_output=xml:%s " \
269                                   "--gtest_list_tests" % \
270                                   (command, self.config.device_report_path)
271            result, _, _ = self.config.device.execute_command_with_timeout(
272                command=collect_test_command,
273                case_type=DeviceTestType.cpp_test_lite,
274                timeout=15, receiver=None)
275            if CPP_TEST_MOUNT_STOP_SIGN in result:
276                tests = []
277                return tests
278            tests = self.read_nfs_xml(request, self.config.device_xml_path)
279            self.delete_device_xml(request, self.config.device_xml_path)
280            return tests
281
282        else:
283            parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_list_lite)
284            parser_instances = []
285            for parser in parsers:
286                parser_instance = parser.__class__()
287                parser_instance.suites_name = os.path.basename(self.result)
288                if listener:
289                    parser_instance.listeners = listener
290                parser_instances.append(parser_instance)
291            handler = ShellHandler(parser_instances)
292
293            collect_test_command = "%s --gtest_list_tests" % command
294            result, _, _ = self.config.device.execute_command_with_timeout(
295                command=collect_test_command,
296                case_type=DeviceTestType.cpp_test_lite,
297                timeout=15, receiver=handler)
298            self.config.command_result = "{}{}".format(
299                self.config.command_result, result)
300            if parser_instances[0].tests and \
301                    len(parser_instances[0].tests) > 0:
302                SuiteReporter.set_suite_list([item.test_name for item in
303                                              parser_instances[0].tests])
304            else:
305                SuiteReporter.set_suite_list([])
306            tests = parser_instances[0].tests
307        if not tests:
308            LOG.error("collect test failed!", error_no="00402")
309        return parser_instances[0].tests
310
311    def run_cpp_test(self, command, request):
312        if request.config.testargs.get("test"):
313            testcases_list = get_testcases(
314                request.config.testargs.get("test"))
315            for test in testcases_list:
316                command_case = "{} --gtest_filter=*{}".format(
317                    command, test)
318
319                if not self.config.xml_output:
320                    self.run(command_case, request.listeners, timeout=15)
321                else:
322                    command_case = "{} --gtest_output=xml:{}".format(
323                        command_case, self.config.device_report_path)
324                    self.run(command_case, None, timeout=15)
325        else:
326            self._do_test_run(command, request)
327
328    def init_cpp_config(self):
329        setattr(self.config, "command_result", "")
330        setattr(self.config, "device_xml_path", "")
331        setattr(self.config, "dry_run", False)
332
333    def merge_xml(self, execute_bin):
334        report_path = os.path.join(self.config.report_path, "result")
335        summary_result = DataHelper.get_summary_result(
336            report_path, self.result, key=sort_by_length,
337            file_prefix=execute_bin)
338        if summary_result:
339            SuiteReporter.append_report_result((
340                os.path.join(report_path, "%s.xml" % execute_bin),
341                DataHelper.to_string(summary_result)))
342        else:
343            self.error_message = "The test case did not generate XML"
344        for xml_file in os.listdir(os.path.split(self.result)[0]):
345            if not xml_file.startswith(execute_bin):
346                continue
347            if xml_file != os.path.split(self.result)[1]:
348                os.remove(os.path.join(os.path.split(
349                    self.result)[0], xml_file))
350
351    def set_file_name(self, request, command):
352        self.file_name = command.split(" ")[0].split("/")[-1].split(".")[0]
353        self.result = "%s.xml" % os.path.join(request.config.report_path,
354                                              "result", self.file_name)
355
356    def run(self, command=None, listener=None, timeout=None):
357        if not timeout:
358            timeout = self.config.timeout
359        if listener:
360            parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_lite)
361            parser_instances = []
362            for parser in parsers:
363                parser_instance = parser.__class__()
364                parser_instance.suite_name = self.file_name
365                parser_instance.listeners = listener
366                parser_instances.append(parser_instance)
367            handler = ShellHandler(parser_instances)
368        else:
369            handler = None
370        result, _, error = self.config.device.execute_command_with_timeout(
371            command=command, case_type=DeviceTestType.cpp_test_lite,
372            timeout=timeout, receiver=handler)
373        self.config.command_result += result
374        if result.count(CPP_TEST_NFS_SIGN) >= 1:
375            _, _, error = self.config.device.execute_command_with_timeout(
376                command="ping %s" % self.linux_host,
377                case_type=DeviceTestType.cpp_test_lite,
378                timeout=5)
379        return error, result, handler
380
381    def _do_test_run(self, command, request):
382        test_to_run = self._collect_test_to_run(request, command)
383        self._run_with_rerun(command, request, test_to_run)
384
385    def _run_with_rerun(self, command, request, expected_tests):
386        if self.config.xml_output:
387            self.run("{} --gtest_output=xml:{}".format(
388                command, self.config.device_report_path))
389            time.sleep(5)
390            test_rerun = True
391            if self.check_xml_exist(self.execute_bin + ".xml"):
392                test_rerun = False
393            test_run = self.read_nfs_xml(request, self.config.device_xml_path,
394                                         test_rerun)
395
396            if len(test_run) < len(expected_tests):
397                expected_tests = TestDescription.remove_test(expected_tests,
398                                                             test_run)
399                self._rerun_tests(command, expected_tests, None)
400        else:
401            test_tracker = CollectingLiteGTestListener()
402            listener = request.listeners
403            listener_copy = listener.copy()
404            listener_copy.append(test_tracker)
405            self.run(command, listener_copy)
406            test_run = test_tracker.get_current_run_results()
407            if len(test_run) != len(expected_tests):
408                expected_tests = TestDescription.remove_test(expected_tests,
409                                                             test_run)
410                self._rerun_tests(command, expected_tests, listener)
411
412    def _rerun_tests(self, command, expected_tests, listener):
413        if not expected_tests:
414            LOG.debug("No tests to re-run, all tests executed at least once.")
415        for test in expected_tests:
416            self._re_run(command, test, listener)
417
418    def _re_run(self, command, test, listener):
419        if self.config.xml_output:
420            _, _, handler = self.run("{} {}=*{} --gtest_output=xml:{}".format(
421                command, GTestConst.exec_para_filter, test.test_name,
422                self.config.device_report_path),
423                listener, timeout=15)
424        else:
425            handler = None
426            for _ in range(FAILED_RUN_TEST_ATTEMPTS):
427                try:
428                    listener_copy = listener.copy()
429                    test_tracker = CollectingLiteGTestListener()
430                    listener_copy.append(test_tracker)
431                    _, _, handler = self.run("{} {}=*{}".format(
432                        command, GTestConst.exec_para_filter, test.test_name),
433                        listener_copy, timeout=15)
434                    if len(test_tracker.get_current_run_results()):
435                        return
436                except LiteDeviceError as _:
437                    LOG.debug("Exception: ShellCommandUnresponsiveException")
438            handler.parsers[0].mark_test_as_failed(test)
439
440    def _collect_test_to_run(self, request, command):
441        if self.rerun:
442            tests = self.dry_run(request, command)
443            return tests
444        return []
445
446    def download_nfs_xml(self, request, report_path):
447        remote_nfs = get_nfs_server(request)
448        if not remote_nfs:
449            err_msg = "The name of remote device {} does not match". \
450                format(self.remote)
451            LOG.error(err_msg, error_no="00403")
452            raise TypeError(err_msg)
453        LOG.info("Trying to pull remote server: {}:{} report files to local "
454                 "in dir {}".format
455                 (remote_nfs.get("ip"), remote_nfs.get("port"),
456                  os.path.dirname(self.result)))
457        result_dir = os.path.join(request.config.report_path, "result")
458        os.makedirs(result_dir, exist_ok=True)
459        try:
460            if remote_nfs["remote"] == "true":
461                import paramiko
462                client = paramiko.Transport((remote_nfs.get("ip"),
463                                             int(remote_nfs.get("port"))))
464                client.connect(username=remote_nfs.get("username"),
465                               password=remote_nfs.get("password"))
466                sftp = paramiko.SFTPClient.from_transport(client)
467                files = sftp.listdir(report_path)
468
469                for report_xml in files:
470                    if report_xml.endswith(".xml"):
471                        filepath = report_path + report_xml
472                        try:
473                            sftp.get(remotepath=filepath,
474                                     localpath=os.path.join(os.path.split(
475                                         self.result)[0], report_xml))
476                        except IOError as error:
477                            LOG.error(error, error_no="00404")
478                client.close()
479            else:
480                if os.path.isdir(report_path):
481                    for report_xml in os.listdir(report_path):
482                        if report_xml.endswith(".xml"):
483                            filepath = report_path + report_xml
484                            shutil.copy(filepath,
485                                        os.path.join(os.path.split(
486                                            self.result)[0], report_xml))
487        except (FileNotFoundError, IOError) as error:
488            LOG.error("download xml failed %s" % error, error_no="00403")
489
490    def check_xml_exist(self, xml_file, timeout=60):
491        ls_command = "ls %s" % self.config.device_report_path
492        start_time = time.time()
493        while time.time() - start_time < timeout:
494            result, _, _ = self.config.device.execute_command_with_timeout(
495                command=ls_command, case_type=DeviceTestType.cpp_test_lite,
496                timeout=5, receiver=None)
497            if xml_file in result:
498                return True
499            time.sleep(5)
500            if (self.execute_bin + "_1.xml") in result:
501                return False
502        return False
503
504    def read_nfs_xml(self, request, report_path, is_true=False):
505        remote_nfs = get_nfs_server(request)
506        if not remote_nfs:
507            err_msg = "The name of remote device {} does not match". \
508                format(self.remote)
509            LOG.error(err_msg, error_no="00403")
510            raise TypeError(err_msg)
511        tests = []
512        execute_bin_xml = (self.execute_bin + "_1.xml") if is_true else (
513                self.execute_bin + ".xml")
514        LOG.debug("run into :{}".format(is_true))
515        file_path = os.path.join(report_path, execute_bin_xml)
516        if not self.check_xml_exist(execute_bin_xml):
517            return tests
518
519        from xml.etree import ElementTree
520        try:
521            if remote_nfs["remote"] == "true":
522                import paramiko
523                client = paramiko.SSHClient()
524                client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
525                client.connect(hostname=remote_nfs.get("ip"),
526                               port=int(remote_nfs.get("port")),
527                               username=remote_nfs.get("username"),
528                               password=remote_nfs.get("password"))
529                sftp_client = client.open_sftp()
530                remote_file = sftp_client.open(file_path)
531                try:
532                    result = remote_file.read().decode()
533                    suites_element = ElementTree.fromstring(result)
534                    for suite_element in suites_element:
535                        suite_name = suite_element.get("name", "")
536                        for case in suite_element:
537                            case_name = case.get("name")
538                            test = TestDescription(suite_name, case_name)
539                            if test not in tests:
540                                tests.append(test)
541                finally:
542                    remote_file.close()
543                client.close()
544            else:
545                if os.path.isdir(report_path):
546                    flags = os.O_RDONLY
547                    modes = stat.S_IWUSR | stat.S_IRUSR
548                    with os.fdopen(os.open(file_path, flags, modes),
549                                   "r") as test_file:
550                        result = test_file.read()
551                        suites_element = ElementTree.fromstring(result)
552                        for suite_element in suites_element:
553                            suite_name = suite_element.get("name", "")
554                            for case in suite_element:
555                                case_name = case.get("name")
556                                test = TestDescription(suite_name, case_name)
557                                if test not in tests:
558                                    tests.append(test)
559        except (FileNotFoundError, IOError) as error:
560            LOG.error("download xml failed %s" % error, error_no="00403")
561        except SyntaxError as error:
562            LOG.error("parse xml failed %s" % error, error_no="00404")
563        return tests
564
565    def delete_device_xml(self, request, report_path):
566        remote_nfs = get_nfs_server(request)
567        if not remote_nfs:
568            err_msg = "The name of remote device {} does not match". \
569                format(self.remote)
570            LOG.error(err_msg, error_no="00403")
571            raise TypeError(err_msg)
572        LOG.info("delete xml directory {} from remote server: {}"
573                 "".format
574                 (report_path, remote_nfs.get("ip")))
575        if remote_nfs["remote"] == "true":
576            import paramiko
577            client = paramiko.Transport((remote_nfs.get("ip"),
578                                         int(remote_nfs.get("port"))))
579            client.connect(username=remote_nfs.get("username"),
580                           password=remote_nfs.get("password"))
581            sftp = paramiko.SFTPClient.from_transport(client)
582            try:
583                sftp.stat(report_path)
584                files = sftp.listdir(report_path)
585                for report_xml in files:
586                    if report_xml.endswith(".xml"):
587                        filepath = "{}{}".format(report_path, report_xml)
588                        try:
589                            sftp.remove(filepath)
590                            time.sleep(0.5)
591                        except IOError as _:
592                            pass
593            except FileNotFoundError as _:
594                pass
595            client.close()
596        else:
597            for report_xml in glob.glob(os.path.join(report_path, '*.xml')):
598                try:
599                    os.remove(report_xml)
600                except Exception as exception:
601                    LOG.error(
602                        "remove {} Failed:{}".format(report_xml, exception))
603                    pass
604
605    def __result__(self):
606        return self.result if os.path.exists(self.result) else ""
607
608
609@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite)
610class CTestDriver(IDriver):
611    """
612    CTest is a test that runs a native test package on given lite device.
613    """
614    config = None
615    result = ""
616    error_message = ""
617    version_cmd = "AT+CSV"
618
619    def __init__(self):
620        self.file_name = ""
621
622    def __check_environment__(self, device_options):
623        if len(device_options) != 1 or \
624                device_options[0].label != DeviceLabelType.wifiiot:
625            self.error_message = "check environment failed"
626            return False
627        return True
628
629    def __check_config__(self, config=None):
630        del config
631        self.config = None
632
633    def __execute__(self, request):
634        from xdevice import Variables
635        try:
636            self.config = request.config
637            self.config.device = request.config.environment.devices[0]
638            current_dir = request.config.resource_path if \
639                request.config.resource_path else Variables.exec_dir
640            if request.root.source.source_file.strip():
641                source = os.path.join(current_dir,
642                                      request.root.source.source_file.strip())
643                self.file_name = os.path.basename(
644                    request.root.source.source_file.strip()).split(".")[0]
645            else:
646                source = request.root.source.source_string.strip()
647            json_config = JsonParser(source)
648            kit_instances = get_kit_instances(json_config,
649                                              request.config.resource_path,
650                                              request.config.testcases_path)
651            for (kit_instance, kit_info) in zip(kit_instances,
652                                                json_config.get_kits()):
653                if isinstance(kit_instance, DeployToolKit):
654                    LOG.debug("run ctest third party")
655                    self._run_ctest_third_party(source=source, request=request,
656                                                time_out=int(
657                                                    kit_instance.time_out))
658                    break
659                else:
660                    LOG.debug("run ctest")
661                    self._run_ctest(source=source, request=request)
662                    break
663
664        except (LiteDeviceExecuteCommandError, Exception) as exception:
665            LOG.error(exception, error_no=getattr(exception, "error_no",
666                                                  "00000"))
667            self.error_message = exception
668        finally:
669            report_name = "report" if request.root.source. \
670                test_name.startswith("{") else get_filename_extension(
671                request.root.source.test_name)[0]
672            self.result = check_result_report(
673                request.config.report_path, self.result, self.error_message,
674                report_name)
675
676    def _run_ctest(self, source=None, request=None):
677        parser_instances = []
678        parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
679        try:
680            if not source:
681                LOG.error("Error: source don't exist %s." % source,
682                          error_no="00101")
683                return
684
685            version = get_test_component_version(self.config)
686
687            for parser in parsers:
688                parser_instance = parser.__class__()
689                parser_instance.suites_name = self.file_name
690                parser_instance.product_info.setdefault("Version", version)
691                parser_instance.listeners = request.listeners
692                parser_instances.append(parser_instance)
693            handler = ShellHandler(parser_instances)
694
695            reset_cmd = self._reset_device(request, source)
696            self.result = "%s.xml" % os.path.join(
697                request.config.report_path, "result", self.file_name)
698            self.config.device.device.com_dict.get(
699                ComType.deploy_com).connect()
700            result, _, error = self.config.device.device. \
701                execute_command_with_timeout(
702                    command=reset_cmd, case_type=DeviceTestType.ctest_lite,
703                    key=ComType.deploy_com, timeout=90, receiver=handler)
704            device_log_file = get_device_log_file(request.config.report_path,
705                                                  request.config.device.
706                                                  __get_serial__())
707            device_log_file_open = \
708                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
709                        os.O_APPEND, FilePermission.mode_755)
710            with os.fdopen(device_log_file_open, "a") as file_name:
711                file_name.write("{}{}".format(
712                    "\n".join(result.split("\n")[0:-1]), "\n"))
713                file_name.flush()
714        finally:
715            self.config.device.device.com_dict.get(
716                ComType.deploy_com).close()
717
718    def _run_ctest_third_party(self, source=None, request=None, time_out=5):
719        parser_instances = []
720        parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
721        try:
722            if not source:
723                LOG.error("Error: source don't exist %s." % source,
724                          error_no="00101")
725                return
726
727            version = get_test_component_version(self.config)
728
729            for parser in parsers:
730                parser_instance = parser.__class__()
731                parser_instance.suites_name = self.file_name
732                parser_instance.product_info.setdefault("Version", version)
733                parser_instance.listeners = request.listeners
734                parser_instances.append(parser_instance)
735            handler = ShellHandler(parser_instances)
736
737            while True:
738                input_burning = input("Please enter 'y' or 'n' after "
739                                      "the burning is complete,"
740                                      "enter 'quit' to exit:")
741                if input_burning.lower().strip() in ["y", "yes"]:
742                    LOG.info("Burning succeeded.")
743                    break
744                elif input_burning.lower().strip() in ["n", "no"]:
745                    LOG.info("Burning failed.")
746                elif input_burning.lower().strip() == "quit":
747                    break
748                else:
749                    LOG.info("The input {} parameter is incorrect,"
750                             "please enter 'y' or 'n' after the "
751                             "burning is complete ,enter 'quit' "
752                             "to exit.".format(input_burning))
753            LOG.info("Please press the device "
754                     "reset button when the send commmand [] appears ")
755            time.sleep(3)
756            self.result = "%s.xml" % os.path.join(
757                request.config.report_path, "result", self.file_name)
758            self.config.device.device.com_dict.get(
759                ComType.deploy_com).connect()
760            result, _, error = self.config.device.device. \
761                execute_command_with_timeout(
762                command=[], case_type=DeviceTestType.ctest_lite,
763                key=ComType.deploy_com, timeout=time_out, receiver=handler)
764            device_log_file = get_device_log_file(request.config.report_path,
765                                                  request.config.device.
766                                                  __get_serial__())
767            device_log_file_open = \
768                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
769                        os.O_APPEND, 0o755)
770            with os.fdopen(device_log_file_open, "a") as file_name:
771                file_name.write("{}{}".format(
772                    "\n".join(result.split("\n")[0:-1]), "\n"))
773                file_name.flush()
774        finally:
775            self.config.device.device.com_dict.get(
776                ComType.deploy_com).close()
777
778    def _reset_device(self, request, source):
779        json_config = JsonParser(source)
780        reset_cmd = []
781        kit_instances = get_kit_instances(json_config,
782                                          request.config.resource_path,
783                                          request.config.testcases_path)
784        from xdevice import Scheduler
785        for (kit_instance, kit_info) in zip(kit_instances,
786                                            json_config.get_kits()):
787            if not isinstance(kit_instance, DeployKit):
788                continue
789            if not self.file_name:
790                self.file_name = get_config_value(
791                    'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
792            reset_cmd = kit_instance.burn_command
793            if not Scheduler.is_execute:
794                raise ExecuteTerminate("ExecuteTerminate",
795                                       error_no="00300")
796            kit_instance.__setup__(
797                self.config.device)
798        reset_cmd = [int(item, 16) for item in reset_cmd]
799        return reset_cmd
800
801    def __result__(self):
802        return self.result if os.path.exists(self.result) else ""
803
804
805@Plugin(type=Plugin.DRIVER, id=DeviceTestType.open_source_test)
806class OpenSourceTestDriver(IDriver):
807    """
808    OpenSourceTest is a test that runs a native test package on given
809    device lite device.
810    """
811    config = None
812    result = ""
813    error_message = ""
814    has_param = False
815
816    def __init__(self):
817        self.rerun = True
818        self.file_name = ""
819        self.handler = None
820
821    def __check_environment__(self, device_options):
822        if len(device_options) != 1 or \
823                device_options[0].label != DeviceLabelType.ipcamera:
824            self.error_message = "check environment failed"
825            return False
826        return True
827
828    def __check_config__(self, config=None):
829        pass
830
831    def __execute__(self, request):
832        kits = []
833        try:
834            self.config = request.config
835            setattr(self.config, "command_result", "")
836            self.config.device = request.config.environment.devices[0]
837            init_remote_server(self, request)
838            config_file = request.root.source.config_file
839            json_config = JsonParser(config_file)
840            pre_cmd = get_config_value('pre_cmd', json_config.get_driver(),
841                                       False)
842            execute_dir = get_config_value('execute', json_config.get_driver(),
843                                           False)
844            kits = get_kit_instances(json_config,
845                                     request.config.resource_path,
846                                     request.config.testcases_path)
847            from xdevice import Scheduler
848            for kit in kits:
849                if not Scheduler.is_execute:
850                    raise ExecuteTerminate("ExecuteTerminate",
851                                           error_no="00300")
852                copy_list = kit.__setup__(request.config.device,
853                                          request=request)
854
855            self.file_name = request.root.source.test_name
856            self.set_file_name(request, request.root.source.test_name)
857            self.config.device.execute_command_with_timeout(
858                command=pre_cmd, timeout=1)
859            self.config.device.execute_command_with_timeout(
860                command="cd {}".format(execute_dir), timeout=1)
861            device_log_file = get_device_log_file(
862                request.config.report_path,
863                request.config.device.__get_serial__())
864            device_log_file_open = \
865                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
866                        os.O_APPEND, FilePermission.mode_755)
867            with os.fdopen(device_log_file_open, "a") as file_name:
868                for test_bin in copy_list:
869                    if not test_bin.endswith(".run-test"):
870                        continue
871                    if test_bin.startswith("/"):
872                        command = ".%s" % test_bin
873                    else:
874                        command = "./%s" % test_bin
875                    self._do_test_run(command, request)
876                    file_name.write(self.config.command_result)
877                file_name.flush()
878
879        except (LiteDeviceExecuteCommandError, Exception) as exception:
880            LOG.error(exception, error_no=getattr(exception, "error_no",
881                                                  "00000"))
882            self.error_message = exception
883        finally:
884            LOG.info("-------------finally-----------------")
885            # umount the dirs already mount
886            for kit in kits:
887                kit.__teardown__(request.config.device)
888            self.config.device.close()
889            report_name = "report" if request.root.source. \
890                test_name.startswith("{") else get_filename_extension(
891                    request.root.source.test_name)[0]
892            self.result = check_result_report(
893                request.config.report_path, self.result, self.error_message,
894                report_name)
895
896    def set_file_name(self, request, bin_file):
897        self.result = "%s.xml" % os.path.join(
898            request.config.report_path, "result", bin_file)
899
900    def run(self, command=None, listener=None, timeout=20):
901        parsers = get_plugin(Plugin.PARSER,
902                             ParserType.open_source_test)
903        parser_instances = []
904        for parser in parsers:
905            parser_instance = parser.__class__()
906            parser_instance.suite_name = self.file_name
907            parser_instance.test_name = command.replace("./", "")
908            parser_instance.listeners = listener
909            parser_instances.append(parser_instance)
910        self.handler = ShellHandler(parser_instances)
911        for _ in range(3):
912            result, _, error = self.config.device.execute_command_with_timeout(
913                command=command, case_type=DeviceTestType.open_source_test,
914                timeout=timeout, receiver=self.handler)
915            self.config.command_result = result
916            if "pass" in result.lower():
917                break
918        return error, result, self.handler
919
920    def _do_test_run(self, command, request):
921        listeners = request.listeners
922        for listener in listeners:
923            listener.device_sn = self.config.device.device_sn
924        error, _, _ = self.run(command, listeners, timeout=60)
925        if error:
926            LOG.error(
927                "execute %s failed" % command, error_no="00402")
928
929    def __result__(self):
930        return self.result if os.path.exists(self.result) else ""
931
932
933@Plugin(type=Plugin.DRIVER, id=DeviceTestType.build_only_test)
934class BuildOnlyTestDriver(IDriver):
935    """
936    BuildOnlyTest is a test that runs a native test package on given
937    device lite device.
938    """
939    config = None
940    result = ""
941    error_message = ""
942
943    def __check_environment__(self, device_options):
944        pass
945
946    def __check_config__(self, config):
947        pass
948
949    def __execute__(self, request):
950        self.config = request.config
951        self.config.device = request.config.environment.devices[0]
952        self.file_name = request.root.source.test_name
953        self.config_file = request.root.source.config_file
954        self.testcases_path = request.config.testcases_path
955        file_path = self._get_log_file()
956        result_list = self._get_result_list(file_path)
957        if len(result_list) == 0:
958            LOG.error(
959                "Error: source don't exist %s." % request.root.source.
960                source_file, error_no="00101")
961            return
962        total_result = ''
963        for result in result_list:
964            flags = os.O_RDONLY
965            modes = stat.S_IWUSR | stat.S_IRUSR
966            with os.fdopen(os.open(result, flags, modes), "r",
967                           encoding="utf-8") as file_content:
968                result = file_content.read()
969                if not result.endswith('\n'):
970                    result = '%s\n' % result
971            total_result = '{}{}'.format(total_result, result)
972        parsers = get_plugin(Plugin.PARSER, ParserType.build_only_test)
973        parser_instances = []
974        for parser in parsers:
975            parser_instance = parser.__class__()
976            parser_instance.suite_name = self.file_name
977            parser_instance.listeners = request.listeners
978            parser_instances.append(parser_instance)
979        handler = ShellHandler(parser_instances)
980        generate_report(handler, total_result)
981
982    @classmethod
983    def _get_result_list(cls, file_path):
984        result_list = list()
985        for root_path, dirs_path, file_names in os.walk(file_path):
986            for file_name in file_names:
987                if file_name == "logfile":
988                    result_list.append(os.path.join(root_path, file_name))
989        return result_list
990
991    def _get_log_file(self):
992        json_config = JsonParser(self.config_file)
993        log_path = get_config_value('log_path', json_config.get_driver(),
994                                    False)
995        log_path = str(log_path.replace("/", "", 1)) if log_path.startswith(
996            "/") else str(log_path)
997        LOG.debug("The log path is:%s" % log_path)
998        file_path = get_file_absolute_path(log_path,
999                                           paths=[self.testcases_path])
1000        LOG.debug("The file path is:%s" % file_path)
1001        return file_path
1002
1003    def __result__(self):
1004        return self.result if os.path.exists(self.result) else ""
1005
1006
1007@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite)
1008class JSUnitTestLiteDriver(IDriver):
1009    """
1010    JSUnitTestDriver is a Test that runs a native test package on given device.
1011    """
1012
1013    def __init__(self):
1014        self.result = ""
1015        self.error_message = ""
1016        self.kits = []
1017        self.config = None
1018
1019    def __check_environment__(self, device_options):
1020        pass
1021
1022    def __check_config__(self, config):
1023        pass
1024
1025    def _get_driver_config(self, json_config):
1026        bundle_name = get_config_value('bundle-name',
1027                                       json_config.get_driver(), False)
1028        if not bundle_name:
1029            raise ParamError("Can't find bundle-name in config file.",
1030                             error_no="00108")
1031        else:
1032            self.config.bundle_name = bundle_name
1033
1034        ability = get_config_value('ability',
1035                                   json_config.get_driver(), False)
1036        if not ability:
1037            self.config.ability = "default"
1038        else:
1039            self.config.ability = ability
1040
1041    def __execute__(self, request):
1042        try:
1043            LOG.debug("Start execute xdevice extension JSUnit Test")
1044
1045            self.config = request.config
1046            self.config.device = request.config.environment.devices[0]
1047
1048            config_file = request.root.source.config_file
1049            suite_file = request.root.source.source_file
1050
1051            if not suite_file:
1052                raise ParamError(
1053                    "test source '%s' not exists" %
1054                    request.root.source.source_string, error_no="00101")
1055
1056            if not os.path.exists(config_file):
1057                LOG.error("Error: Test cases don't exist %s." % config_file,
1058                          error_no="00101")
1059                raise ParamError(
1060                    "Error: Test cases don't exist %s." % config_file,
1061                    error_no="00101")
1062
1063            self.file_name = os.path.basename(
1064                request.root.source.source_file.strip()).split(".")[0]
1065
1066            self.result = "%s.xml" % os.path.join(
1067                request.config.report_path, "result", self.file_name)
1068
1069            json_config = JsonParser(config_file)
1070            self.kits = get_kit_instances(json_config,
1071                                          self.config.resource_path,
1072                                          self.config.testcases_path)
1073
1074            self._get_driver_config(json_config)
1075            from xdevice import Scheduler
1076            for kit in self.kits:
1077                if not Scheduler.is_execute:
1078                    raise ExecuteTerminate("ExecuteTerminate",
1079                                           error_no="00300")
1080                if kit.__class__.__name__ == CKit.liteinstall:
1081                    kit.bundle_name = self.config.bundle_name
1082                kit.__setup__(self.config.device, request=request)
1083
1084            self._run_jsunit(request)
1085
1086        except Exception as exception:
1087            self.error_message = exception
1088        finally:
1089            report_name = "report" if request.root.source. \
1090                test_name.startswith("{") else get_filename_extension(
1091                request.root.source.test_name)[0]
1092
1093            self.result = check_result_report(
1094                request.config.report_path, self.result, self.error_message,
1095                report_name)
1096
1097            for kit in self.kits:
1098                kit.__teardown__(self.config.device)
1099            self.config.device.close()
1100
1101    def _run_jsunit(self, request):
1102        parser_instances = []
1103        parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite)
1104        for parser in parsers:
1105            parser_instance = parser.__class__()
1106            parser_instance.suites_name = self.file_name
1107            parser_instance.listeners = request.listeners
1108            parser_instances.append(parser_instance)
1109        handler = ShellHandler(parser_instances)
1110
1111        command = "./bin/aa start -p %s -n %s" % \
1112                  (self.config.bundle_name, self.config.ability)
1113        result, _, error = self.config.device.execute_command_with_timeout(
1114            command=command, timeout=300, receiver=handler)
1115        device_log_file = get_device_log_file(request.config.report_path,
1116                                              request.config.device.
1117                                              __get_serial__())
1118        device_log_file_open =\
1119            os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
1120                    0o755)
1121        with os.fdopen(device_log_file_open, "a") as file_name:
1122            file_name.write("{}{}".format(
1123                "\n".join(result.split("\n")[0:-1]), "\n"))
1124            file_name.flush()
1125
1126    def __result__(self):
1127        return self.result if os.path.exists(self.result) else ""
1128