• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import glob
22import time
23import stat
24
25from xdevice import UserConfigManager
26from xdevice import ConfigConst
27from xdevice import ExecuteTerminate
28from xdevice import ParamError
29
30from xdevice import TestDescription
31from xdevice import IDriver
32from xdevice import Plugin
33from xdevice import get_plugin
34from xdevice import platform_logger
35from xdevice import DataHelper
36from xdevice import JsonParser
37from xdevice import get_config_value
38from xdevice import get_kit_instances
39from xdevice import check_result_report
40from xdevice import get_device_log_file
41from xdevice import get_filename_extension
42from xdevice import get_file_absolute_path
43from xdevice import get_test_component_version
44from xdevice import SuiteReporter
45from xdevice import ShellHandler
46from xdevice import FilePermission
47from xdevice import DeviceTestType
48from xdevice import GTestConst
49from xdevice import DeviceLabelType
50from ohos.constants import ComType
51from ohos.constants import ParserType
52from ohos.constants import DeviceLiteKernel
53from ohos.constants import CKit
54from ohos.exception import LiteDeviceError
55from ohos.exception import LiteDeviceExecuteCommandError
56from ohos.executor.listener import CollectingLiteGTestListener
57from ohos.testkit.kit_lite import DeployKit
58from ohos.testkit.kit_lite import DeployToolKit
59from ohos.environment.dmlib_lite import generate_report
60
61__all__ = ["CppTestDriver", "CTestDriver", "init_remote_server"]
62LOG = platform_logger("DriversLite")
63FAILED_RUN_TEST_ATTEMPTS = 2
64CPP_TEST_MOUNT_STOP_SIGN = "not mount properly, Test Stop"
65CPP_TEST_NFS_SIGN = "execve: I/O error"
66
67
68def get_nfs_server(request):
69    config_manager = UserConfigManager(
70        config_file=request.get(ConfigConst.configfile, ""),
71        env=request.get(ConfigConst.test_environment, ""))
72    remote_info = config_manager.get_user_config("testcases/server",
73                                                 filter_name="NfsServer")
74    if not remote_info:
75        err_msg = "The name of remote nfs server does not match"
76        LOG.error(err_msg, error_no="00403")
77        raise ParamError(err_msg, error_no="00403")
78    return remote_info
79
80
81def init_remote_server(lite_instance, request=None):
82    config_manager = UserConfigManager(
83        config_file=request.get(ConfigConst.configfile, ""),
84        env=request.get(ConfigConst.test_environment, ""))
85    linux_dict = config_manager.get_user_config("testcases/server")
86
87    if linux_dict:
88        setattr(lite_instance, "linux_host", linux_dict.get("ip"))
89        setattr(lite_instance, "linux_port", linux_dict.get("port"))
90        setattr(lite_instance, "linux_directory", linux_dict.get("dir"))
91
92    else:
93        error_message = "nfs server does not exist, please " \
94                        "check user_config.xml"
95        raise ParamError(error_message, error_no="00108")
96
97
98def get_testcases(testcases_list):
99    cases_list = []
100    for test in testcases_list:
101        test_item = test.split("#")
102        if len(test_item) == 1:
103            cases_list.append(test)
104        elif len(test_item) == 2:
105            cases_list.append(test_item[-1])
106    return cases_list
107
108
109def sort_by_length(file_name):
110    return len(file_name)
111
112
113@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test_lite)
114class CppTestDriver(IDriver):
115    """
116    CppTest is a test that runs a native test package on given lite device.
117    """
118    config = None
119    result = ""
120    error_message = ""
121
122    def __init__(self):
123        self.rerun = True
124        self.file_name = ""
125
126    def __check_environment__(self, device_options):
127        if len(device_options) != 1 or \
128                device_options[0].label != DeviceLabelType.ipcamera:
129            self.error_message = "check environment failed"
130            return False
131        return True
132
133    def __check_config__(self, config=None):
134        pass
135
136    def __execute__(self, request):
137        kits = []
138        device_log_file = get_device_log_file(
139            request.config.report_path,
140            request.get_devices()[0].__get_serial__())
141        try:
142            self.config = request.config
143            self.init_cpp_config()
144            self.config.device = request.config.environment.devices[0]
145            init_remote_server(self, request=request)
146            config_file = request.root.source.config_file
147            json_config = JsonParser(config_file)
148            self._get_driver_config(json_config)
149
150            bin_file = get_config_value('execute', json_config.get_driver(),
151                                        False)
152            kits = get_kit_instances(json_config,
153                                     request.config.resource_path,
154                                     request.config.testcases_path)
155            from xdevice import Scheduler
156            for kit in kits:
157                if not Scheduler.is_execute:
158                    raise ExecuteTerminate("ExecuteTerminate",
159                                           error_no="00300")
160                kit.__setup__(request.config.device, request=request)
161
162            command = self._get_execute_command(bin_file)
163
164            self.set_file_name(request, command)
165
166            if self.config.xml_output:
167                self.delete_device_xml(request, self.config.device_xml_path)
168            if os.path.exists(self.result):
169                os.remove(self.result)
170            if request.config.testargs.get("dry_run"):
171                self.config.dry_run = request.config.testargs.get(
172                    "dry_run")[0].lower()
173                self.dry_run(command, request.listeners)
174            else:
175                self.run_cpp_test(command, request)
176                self.generate_device_xml(request, self.execute_bin)
177
178        except (LiteDeviceError, Exception) as exception:
179            LOG.exception(exception, exc_info=False)
180            self.error_message = exception
181        finally:
182            device_log_file_open = os.open(device_log_file, os.O_WRONLY |
183                                           os.O_CREAT | os.O_APPEND,
184                                           FilePermission.mode_755)
185            with os.fdopen(device_log_file_open, "a") as file_name:
186                file_name.write(self.config.command_result)
187                file_name.flush()
188            LOG.info("-------------finally-----------------")
189            self._after_command(kits, request)
190
191    def _get_execute_command(self, bin_file):
192        if self.config.device.get("device_kernel") == \
193                DeviceLiteKernel.linux_kernel:
194            execute_dir = "/storage" + "/".join(bin_file.split("/")[0:-1])
195        else:
196            execute_dir = "/".join(bin_file.split("/")[0:-1])
197        self.execute_bin = bin_file.split("/")[-1]
198
199        self.config.device.execute_command_with_timeout(
200            command="cd {}".format(execute_dir), timeout=1)
201        self.config.execute_bin_path = execute_dir
202
203        if self.execute_bin.startswith("/"):
204            command = ".%s" % self.execute_bin
205        else:
206            command = "./%s" % self.execute_bin
207
208        report_path = "/%s/%s/" % ("reports", self.execute_bin.split(".")[0])
209        self.config.device_xml_path = (self.linux_directory + report_path). \
210            replace("//", "/")
211        self.config.device_report_path = execute_dir + report_path
212
213        return command
214
215    def _get_driver_config(self, json_config):
216        xml_output = get_config_value('xml-output',
217                                      json_config.get_driver(), False)
218
219        if isinstance(xml_output, bool):
220            self.config.xml_output = xml_output
221        elif str(xml_output).lower() == "false":
222            self.config.xml_output = False
223        else:
224            self.config.xml_output = True
225
226        rerun = get_config_value('rerun', json_config.get_driver(), False)
227        if isinstance(rerun, bool):
228            self.rerun = rerun
229        elif str(rerun).lower() == "false":
230            self.rerun = False
231        else:
232            self.rerun = True
233
234        timeout_config = get_config_value('timeout',
235                                          json_config.get_driver(), False)
236        if timeout_config:
237            self.config.timeout = int(timeout_config) // 1000
238        else:
239            self.config.timeout = 900
240
241    def _after_command(self, kits, request):
242        if self.config.device.get("device_kernel") == \
243                DeviceLiteKernel.linux_kernel:
244            self.config.device.execute_command_with_timeout(
245                command="cd /storage", timeout=1)
246        else:
247            self.config.device.execute_command_with_timeout(
248                command="cd /", timeout=1)
249        for kit in kits:
250            kit.__teardown__(request.config.device)
251        self.config.device.close()
252        self.delete_device_xml(request, self.linux_directory)
253
254        report_name = "report" if request.root.source. \
255            test_name.startswith("{") else get_filename_extension(
256            request.root.source.test_name)[0]
257        if not self.config.dry_run:
258            self.result = check_result_report(
259                request.config.report_path, self.result, self.error_message,
260                report_name)
261
262    def generate_device_xml(self, request, execute_bin):
263        if self.config.xml_output:
264            self.download_nfs_xml(request, self.config.device_xml_path)
265            self.merge_xml(execute_bin)
266
267    def dry_run(self, request, command, listener=None):
268        if self.config.xml_output:
269            collect_test_command = "%s --gtest_output=xml:%s " \
270                                   "--gtest_list_tests" % \
271                                   (command, self.config.device_report_path)
272            result, _, _ = self.config.device.execute_command_with_timeout(
273                command=collect_test_command,
274                case_type=DeviceTestType.cpp_test_lite,
275                timeout=15, receiver=None)
276            if CPP_TEST_MOUNT_STOP_SIGN in result:
277                tests = []
278                return tests
279            tests = self.read_nfs_xml(request, self.config.device_xml_path)
280            self.delete_device_xml(request, self.config.device_xml_path)
281            return tests
282
283        else:
284            parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_list_lite)
285            parser_instances = []
286            for parser in parsers:
287                parser_instance = parser.__class__()
288                parser_instance.suites_name = os.path.basename(self.result)
289                if listener:
290                    parser_instance.listeners = listener
291                parser_instances.append(parser_instance)
292            handler = ShellHandler(parser_instances)
293
294            collect_test_command = "%s --gtest_list_tests" % command
295            result, _, _ = self.config.device.execute_command_with_timeout(
296                command=collect_test_command,
297                case_type=DeviceTestType.cpp_test_lite,
298                timeout=15, receiver=handler)
299            self.config.command_result = "{}{}".format(
300                self.config.command_result, result)
301            if parser_instances[0].tests and \
302                    len(parser_instances[0].tests) > 0:
303                SuiteReporter.set_suite_list([item.test_name for item in
304                                              parser_instances[0].tests])
305            else:
306                SuiteReporter.set_suite_list([])
307            tests = parser_instances[0].tests
308        if not tests:
309            LOG.error("Collect test failed!", error_no="00402")
310        return parser_instances[0].tests
311
312    def run_cpp_test(self, command, request):
313        if request.config.testargs.get("test"):
314            testcases_list = get_testcases(
315                request.config.testargs.get("test"))
316            for test in testcases_list:
317                command_case = "{} --gtest_filter=*{}".format(
318                    command, test)
319
320                if not self.config.xml_output:
321                    self.run(command_case, request.listeners, timeout=15)
322                else:
323                    command_case = "{} --gtest_output=xml:{}".format(
324                        command_case, self.config.device_report_path)
325                    self.run(command_case, None, timeout=15)
326        else:
327            self._do_test_run(command, request)
328
329    def init_cpp_config(self):
330        setattr(self.config, "command_result", "")
331        setattr(self.config, "device_xml_path", "")
332        setattr(self.config, "dry_run", False)
333
334    def merge_xml(self, execute_bin):
335        report_path = os.path.join(self.config.report_path, "result")
336        summary_result = DataHelper.get_summary_result(
337            report_path, self.result, key=sort_by_length,
338            file_prefix=execute_bin)
339        if summary_result:
340            SuiteReporter.append_report_result((
341                os.path.join(report_path, "%s.xml" % execute_bin),
342                DataHelper.to_string(summary_result)))
343        else:
344            self.error_message = "The test case did not generate XML"
345        for xml_file in os.listdir(os.path.split(self.result)[0]):
346            if not xml_file.startswith(execute_bin):
347                continue
348            if xml_file != os.path.split(self.result)[1]:
349                os.remove(os.path.join(os.path.split(
350                    self.result)[0], xml_file))
351
352    def set_file_name(self, request, command):
353        self.file_name = command.split(" ")[0].split("/")[-1].split(".")[0]
354        self.result = "%s.xml" % os.path.join(request.config.report_path,
355                                              "result", self.file_name)
356
357    def run(self, command=None, listener=None, timeout=None):
358        if not timeout:
359            timeout = self.config.timeout
360        if listener:
361            parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_lite)
362            parser_instances = []
363            for parser in parsers:
364                parser_instance = parser.__class__()
365                parser_instance.suite_name = self.file_name
366                parser_instance.listeners = listener
367                parser_instances.append(parser_instance)
368            handler = ShellHandler(parser_instances)
369        else:
370            handler = None
371        result, _, error = self.config.device.execute_command_with_timeout(
372            command=command, case_type=DeviceTestType.cpp_test_lite,
373            timeout=timeout, receiver=handler)
374        self.config.command_result += result
375        if result.count(CPP_TEST_NFS_SIGN) >= 1:
376            _, _, error = self.config.device.execute_command_with_timeout(
377                command="ping %s" % self.linux_host,
378                case_type=DeviceTestType.cpp_test_lite,
379                timeout=5)
380        return error, result, handler
381
382    def _do_test_run(self, command, request):
383        test_to_run = self._collect_test_to_run(request, command)
384        self._run_with_rerun(command, request, test_to_run)
385
386    def _run_with_rerun(self, command, request, expected_tests):
387        if self.config.xml_output:
388            self.run("{} --gtest_output=xml:{}".format(
389                command, self.config.device_report_path))
390            time.sleep(5)
391            test_rerun = True
392            if self.check_xml_exist(self.execute_bin + ".xml"):
393                test_rerun = False
394            test_run = self.read_nfs_xml(request,
395                                         self.config.device_xml_path,
396                                         test_rerun)
397            if len(test_run) < len(expected_tests):
398                expected_tests = TestDescription.remove_test(expected_tests,
399                                                             test_run)
400                self._rerun_tests(command, expected_tests, None)
401        else:
402            test_tracker = CollectingLiteGTestListener()
403            listener = request.listeners
404            listener_copy = listener.copy()
405            listener_copy.append(test_tracker)
406            self.run(command, listener_copy)
407            test_run = test_tracker.get_current_run_results()
408            if len(test_run) != len(expected_tests):
409                expected_tests = TestDescription.remove_test(expected_tests,
410                                                             test_run)
411                self._rerun_tests(command, expected_tests, listener)
412
413    def _rerun_tests(self, command, expected_tests, listener):
414        if not expected_tests:
415            LOG.debug("No tests to re-run, all tests executed at least once.")
416        for test in expected_tests:
417            self._re_run(command, test, listener)
418
419    def _re_run(self, command, test, listener):
420        if self.config.xml_output:
421            _, _, handler = self.run("{} {}=*{} --gtest_output=xml:{}".format(
422                command, GTestConst.exec_para_filter, test.test_name,
423                self.config.device_report_path),
424                listener, timeout=15)
425        else:
426            handler = None
427            for _ in range(FAILED_RUN_TEST_ATTEMPTS):
428                try:
429                    listener_copy = listener.copy()
430                    test_tracker = CollectingLiteGTestListener()
431                    listener_copy.append(test_tracker)
432                    _, _, handler = self.run("{} {}=*{}".format(
433                        command, GTestConst.exec_para_filter, test.test_name),
434                        listener_copy, timeout=15)
435                    if len(test_tracker.get_current_run_results()):
436                        return
437                except LiteDeviceError as _:
438                    LOG.debug("Exception: ShellCommandUnresponsiveException")
439            handler.parsers[0].mark_test_as_failed(test)
440
441    def _collect_test_to_run(self, request, command):
442        if self.rerun:
443            tests = self.dry_run(request, command)
444            return tests
445        return []
446
447    def download_nfs_xml(self, request, report_path):
448        remote_nfs = get_nfs_server(request)
449        if not remote_nfs:
450            err_msg = "The name of remote device {} does not match". \
451                format(self.remote)
452            LOG.error(err_msg, error_no="00403")
453            raise TypeError(err_msg)
454        LOG.info("Trying to pull remote server: {}:{} report files to local "
455                 "in dir {}".format
456                 (remote_nfs.get("ip"), remote_nfs.get("port"),
457                  os.path.dirname(self.result)))
458        result_dir = os.path.join(request.config.report_path, "result")
459        os.makedirs(result_dir, exist_ok=True)
460        try:
461            if remote_nfs["remote"] == "true":
462                import paramiko
463                client = paramiko.Transport((remote_nfs.get("ip"),
464                                             int(remote_nfs.get("port"))))
465                client.connect(username=remote_nfs.get("username"),
466                               password=remote_nfs.get("password"))
467                sftp = paramiko.SFTPClient.from_transport(client)
468                files = sftp.listdir(report_path)
469
470                for report_xml in files:
471                    if report_xml.endswith(".xml"):
472                        filepath = report_path + report_xml
473                        try:
474                            sftp.get(remotepath=filepath,
475                                     localpath=os.path.join(os.path.split(
476                                         self.result)[0], report_xml))
477                        except IOError as error:
478                            LOG.error(error, error_no="00404")
479                client.close()
480            else:
481                if os.path.isdir(report_path):
482                    for report_xml in os.listdir(report_path):
483                        if report_xml.endswith(".xml"):
484                            filepath = report_path + report_xml
485                            shutil.copy(filepath,
486                                        os.path.join(os.path.split(
487                                            self.result)[0], report_xml))
488        except (FileNotFoundError, IOError) as error:
489            LOG.error("Download xml failed %s" % error, error_no="00403")
490
491    def check_xml_exist(self, xml_file, timeout=60):
492        ls_command = "ls %s" % self.config.device_report_path
493        start_time = time.time()
494        while time.time() - start_time < timeout:
495            result, _, _ = self.config.device.execute_command_with_timeout(
496                command=ls_command, case_type=DeviceTestType.cpp_test_lite,
497                timeout=5, receiver=None)
498            if xml_file in result:
499                return True
500            time.sleep(5)
501            if (self.execute_bin + "_1.xml") in result:
502                return False
503        return False
504
505    def read_nfs_xml(self, request, report_path, is_true=False):
506        remote_nfs = get_nfs_server(request)
507        if not remote_nfs:
508            err_msg = "The name of remote device {} does not match". \
509                format(self.remote)
510            LOG.error(err_msg, error_no="00403")
511            raise TypeError(err_msg)
512        tests = []
513        execute_bin_xml = (self.execute_bin + "_1.xml") if is_true else (
514                self.execute_bin + ".xml")
515        LOG.debug("run into :{}".format(is_true))
516        file_path = os.path.join(report_path, execute_bin_xml)
517        if not self.check_xml_exist(execute_bin_xml):
518            return tests
519
520        from xml.etree import ElementTree
521        try:
522            if remote_nfs["remote"] == "true":
523                import paramiko
524                client = paramiko.SSHClient()
525                client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
526                client.connect(hostname=remote_nfs.get("ip"),
527                               port=int(remote_nfs.get("port")),
528                               username=remote_nfs.get("username"),
529                               password=remote_nfs.get("password"))
530                sftp_client = client.open_sftp()
531                remote_file = sftp_client.open(file_path)
532                try:
533                    result = remote_file.read().decode()
534                    suites_element = ElementTree.fromstring(result)
535                    for suite_element in suites_element:
536                        suite_name = suite_element.get("name", "")
537                        for case in suite_element:
538                            case_name = case.get("name")
539                            test = TestDescription(suite_name, case_name)
540                            if test not in tests:
541                                tests.append(test)
542                finally:
543                    remote_file.close()
544                client.close()
545            else:
546                if os.path.isdir(report_path):
547                    flags = os.O_RDONLY
548                    modes = stat.S_IWUSR | stat.S_IRUSR
549                    with os.fdopen(os.open(file_path, flags, modes),
550                                   "r") as test_file:
551                        result = test_file.read()
552                        suites_element = ElementTree.fromstring(result)
553                        for suite_element in suites_element:
554                            suite_name = suite_element.get("name", "")
555                            for case in suite_element:
556                                case_name = case.get("name")
557                                test = TestDescription(suite_name, case_name)
558                                if test not in tests:
559                                    tests.append(test)
560        except (FileNotFoundError, IOError) as error:
561            LOG.error("Download xml failed %s" % error, error_no="00403")
562        except SyntaxError as error:
563            LOG.error("Parse xml failed %s" % error, error_no="00404")
564        return tests
565
566    def delete_device_xml(self, request, report_path):
567        remote_nfs = get_nfs_server(request)
568        if not remote_nfs:
569            err_msg = "The name of remote device {} does not match". \
570                format(self.remote)
571            LOG.error(err_msg, error_no="00403")
572            raise TypeError(err_msg)
573        LOG.info("Delete xml directory {} from remote server: {}"
574                 "".format
575                 (report_path, remote_nfs.get("ip")))
576        if remote_nfs["remote"] == "true":
577            import paramiko
578            client = paramiko.Transport((remote_nfs.get("ip"),
579                                         int(remote_nfs.get("port"))))
580            client.connect(username=remote_nfs.get("username"),
581                           password=remote_nfs.get("password"))
582            sftp = paramiko.SFTPClient.from_transport(client)
583            try:
584                sftp.stat(report_path)
585                files = sftp.listdir(report_path)
586                for report_xml in files:
587                    if report_xml.endswith(".xml"):
588                        filepath = "{}{}".format(report_path, report_xml)
589                        try:
590                            sftp.remove(filepath)
591                            time.sleep(0.5)
592                        except IOError as _:
593                            pass
594            except FileNotFoundError as _:
595                pass
596            client.close()
597        else:
598            for report_xml in glob.glob(os.path.join(report_path, '*.xml')):
599                try:
600                    os.remove(report_xml)
601                except Exception as exception:
602                    LOG.error(
603                        "remove {} Failed:{}".format(report_xml, exception))
604                    pass
605
606    def __result__(self):
607        return self.result if os.path.exists(self.result) else ""
608
609
610@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite)
611class CTestDriver(IDriver):
612    """
613    CTest is a test that runs a native test package on given lite device.
614    """
615    config = None
616    result = ""
617    error_message = ""
618    version_cmd = "AT+CSV"
619
620    def __init__(self):
621        self.file_name = ""
622        self.run_third = False
623        self.kit_type = None
624        self.auto_deploy = None
625
626    def __check_environment__(self, device_options):
627        if len(device_options) != 1 or \
628                device_options[0].label != DeviceLabelType.wifiiot:
629            self.error_message = "check environment failed"
630            return False
631        return True
632
633    def __check_config__(self, config=None):
634        del config
635        self.config = None
636
637    def __execute__(self, request):
638        from xdevice import Variables
639        try:
640            self.config = request.config
641            self.config.device = request.config.environment.devices[0]
642            current_dir = request.config.resource_path if \
643                request.config.resource_path else Variables.exec_dir
644            if request.root.source.source_file.strip():
645                source = os.path.join(current_dir,
646                                      request.root.source.source_file.strip())
647                self.file_name = os.path.basename(
648                    request.root.source.source_file.strip()).split(".")[0]
649            else:
650                source = request.root.source.source_string.strip()
651            json_config = JsonParser(source)
652            kit_instances = get_kit_instances(json_config,
653                                              request.config.resource_path,
654                                              request.config.testcases_path)
655            for (_, kit_info) in zip(kit_instances, json_config.get_kits()):
656                self.auto_deploy = kit_info.get("auto_deploy", "")
657                self.kit_type = kit_info.get("type", "")
658                if self.kit_type == CKit.deploytool:
659                    self.run_third = True
660            LOG.info("Kit type:{}".format(self.kit_type))
661            LOG.info("Auto deploy:{}".format(self.auto_deploy))
662            LOG.info("Run third:{}".format(self.run_third))
663            if self.run_third:
664                LOG.info("Run the third-party vendor part")
665                if self.auto_deploy in ["False", "flase"]:
666                    self._run_ctest_third_party(source=source, request=request)
667                else:
668                    self._run_ctest_upgrade_party(source=source, request=request)
669            else:
670                LOG.debug("Run ctest")
671                self._run_ctest(source=source, request=request)
672
673        except (LiteDeviceExecuteCommandError, Exception) as exception:
674            LOG.error(exception, error_no=getattr(exception, "error_no",
675                                                  "00000"))
676            self.error_message = exception
677        finally:
678            report_name = "report" if request.root.source. \
679                test_name.startswith("{") else get_filename_extension(
680                request.root.source.test_name)[0]
681            self.result = check_result_report(
682                request.config.report_path, self.result, self.error_message,
683                report_name)
684
685    def _run_ctest(self, source=None, request=None, timeout=90):
686        parser_instances = []
687        parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
688        try:
689            if not source:
690                LOG.error("Error: source don't exist %s." % source,
691                          error_no="00101")
692                return
693
694            version = get_test_component_version(self.config)
695
696            for parser in parsers:
697                parser_instance = parser.__class__()
698                parser_instance.suites_name = self.file_name
699                parser_instance.product_info.setdefault("Version", version)
700                parser_instance.listeners = request.listeners
701                parser_instances.append(parser_instance)
702            handler = ShellHandler(parser_instances)
703
704            reset_cmd = self._reset_device(request, source)
705            self.result = "%s.xml" % os.path.join(
706                request.config.report_path, "result", self.file_name)
707            self.config.device.device.com_dict.get(
708                ComType.deploy_com).connect()
709            result, _, error = self.config.device.device. \
710                execute_command_with_timeout(
711                    command=reset_cmd, case_type=DeviceTestType.ctest_lite,
712                    key=ComType.deploy_com, timeout=timeout, receiver=handler)
713            device_log_file = get_device_log_file(request.config.report_path,
714                                                  request.config.device.
715                                                  __get_serial__())
716            device_log_file_open = \
717                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
718                        os.O_APPEND, FilePermission.mode_755)
719            with os.fdopen(device_log_file_open, "a") as file_name:
720                file_name.write("{}{}".format(
721                    "\n".join(result.split("\n")[0:-1]), "\n"))
722                file_name.flush()
723        finally:
724            self.config.device.device.com_dict.get(
725                ComType.deploy_com).close()
726
727    def _run_ctest_third_party(self, source=None, request=None, timeout=5):
728        parser_instances = []
729        parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
730        try:
731            if not source:
732                LOG.error("Error: source don't exist %s." % source,
733                          error_no="00101")
734                return
735
736            version = get_test_component_version(self.config)
737
738            for parser in parsers:
739                parser_instance = parser.__class__()
740                parser_instance.suites_name = self.file_name
741                parser_instance.product_info.setdefault("Version", version)
742                parser_instance.listeners = request.listeners
743                parser_instances.append(parser_instance)
744            handler = ShellHandler(parser_instances)
745
746            while True:
747                input_burning = input("Please enter 'y' or 'n' "
748                                      "after the burning  is complete,"
749                                      "enter 'quit' to exit:")
750                if input_burning.lower().strip() in ["y", "yes"]:
751                    LOG.info("Burning succeeded.")
752                    break
753                elif input_burning.lower().strip() in ["n", "no"]:
754                    LOG.info("Burning failed.")
755                elif input_burning.lower().strip() == "quit":
756                    break
757                else:
758                    LOG.info({"The input parameter is incorrect,please"
759                              " enter 'y' or 'n' after the burning is "
760                              "complete,enter 'quit' to exit."})
761            LOG.info("Please press the reset button on the device ")
762            time.sleep(5)
763            self.result = "%s.xml" % os.path.join(
764                request.config.report_path, "result", self.file_name)
765            self.config.device.device.com_dict.get(
766                ComType.deploy_com).connect()
767
768            LOG.debug("Device com:{}".format(self.config.device.device))
769            result, _, error = self.config.device.device. \
770                execute_command_with_timeout(
771                command=[], case_type=DeviceTestType.ctest_lite,
772                key=ComType.deploy_com, timeout=timeout, receiver=handler)
773            device_log_file = get_device_log_file(request.config.report_path,
774                                                  request.config.device.
775                                                  __get_serial__())
776            device_log_file_open = \
777                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
778                        os.O_APPEND, FilePermission.mode_755)
779            with os.fdopen(device_log_file_open, "a") as file_name:
780                file_name.write("{}{}".format(
781                    "\n".join(result.split("\n")[0:-1]), "\n"))
782                file_name.flush()
783        finally:
784            self.config.device.device.com_dict.get(
785                ComType.deploy_com).close()
786
787    def _run_ctest_upgrade_party(self, source=None, request=None, timeout=90):
788        parser_instances = []
789        parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
790        try:
791            if not source:
792                LOG.error("Error: source don't exist %s." % source,
793                          error_no="00101")
794                return
795
796            version = get_test_component_version(self.config)
797
798            for parser in parsers:
799                parser_instance = parser.__class__()
800                parser_instance.suites_name = self.file_name
801                parser_instance.product_info.setdefault("Version", version)
802                parser_instance.listeners = request.listeners
803                parser_instances.append(parser_instance)
804            handler = ShellHandler(parser_instances)
805            result = self._reset_third_device(request, source)
806            self.result = "%s.xml" % os.path.join(
807                request.config.report_path, "result", self.file_name)
808            if isinstance(result, list):
809                self.config.device.device.com_dict.get(
810                    ComType.deploy_com).connect()
811                LOG.debug("Device com:{}".format(self.config.device.device))
812                result, _, error = self.config.device.device. \
813                    execute_command_with_timeout(
814                    command=request, case_type=DeviceTestType.ctest_lite,
815                    key=ComType.deploy_com, timeout=timeout, receiver=handler)
816            else:
817                handler.__read__(result)
818                handler.__done__()
819            device_log_file = get_device_log_file(request.config.report_path,
820                                                  request.config.device.
821                                                  __get_serial__())
822            device_log_file_open = \
823                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
824                        os.O_APPEND, FilePermission.mode_755)
825            with os.fdopen(device_log_file_open, "a") as file_name:
826                file_name.write("{}{}".format(
827                    "\n".join(result.split("\n")[0:-1]), "\n"))
828                file_name.flush()
829        finally:
830            self.config.device.device.com_dict.get(
831                ComType.deploy_com).close()
832
833    def _reset_device(self, request, source):
834        json_config = JsonParser(source)
835        reset_cmd = []
836        kit_instances = get_kit_instances(json_config,
837                                          request.config.resource_path,
838                                          request.config.testcases_path)
839        from xdevice import Scheduler
840        for (kit_instance, kit_info) in zip(kit_instances,
841                                            json_config.get_kits()):
842            if not isinstance(kit_instance, DeployKit):
843                continue
844            if not self.file_name:
845                self.file_name = get_config_value(
846                    'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
847            reset_cmd = kit_instance.burn_command
848            if not Scheduler.is_execute:
849                raise ExecuteTerminate("ExecuteTerminate",
850                                       error_no="00300")
851            kit_instance.__setup__(
852                self.config.device)
853        reset_cmd = [int(item, 16) for item in reset_cmd]
854        return reset_cmd
855
856    def _reset_third_device(self, request, source):
857        json_config = JsonParser(source)
858        reset_cmd = []
859        kit_instances = get_kit_instances(json_config,
860                                          request.config.resource_path,
861                                          request.config.testcases_path)
862        from xdevice import Scheduler
863        for (kit_instance, kit_info) in zip(kit_instances,
864                                            json_config.get_kits()):
865            if not isinstance(kit_instance, DeployToolKit):
866                continue
867            if not self.file_name:
868                self.file_name = get_config_value(
869                    'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
870            if not Scheduler.is_execute:
871                raise ExecuteTerminate("ExecuteTerminate",
872                                       error_no="00300")
873            reset_cmd = kit_instance.__setup__(
874                self.config.device)
875        return reset_cmd
876
877    def __result__(self):
878        return self.result if os.path.exists(self.result) else ""
879
880
881@Plugin(type=Plugin.DRIVER, id=DeviceTestType.open_source_test)
882class OpenSourceTestDriver(IDriver):
883    """
884    OpenSourceTest is a test that runs a native test package on given
885    device lite device.
886    """
887    config = None
888    result = ""
889    error_message = ""
890    has_param = False
891
892    def __init__(self):
893        self.rerun = True
894        self.file_name = ""
895        self.handler = None
896
897    def __check_environment__(self, device_options):
898        if len(device_options) != 1 or \
899                device_options[0].label != DeviceLabelType.ipcamera:
900            self.error_message = "check environment failed"
901            return False
902        return True
903
904    def __check_config__(self, config=None):
905        pass
906
907    def __execute__(self, request):
908        kits = []
909        try:
910            self.config = request.config
911            setattr(self.config, "command_result", "")
912            self.config.device = request.config.environment.devices[0]
913            init_remote_server(self, request)
914            config_file = request.root.source.config_file
915            json_config = JsonParser(config_file)
916            pre_cmd = get_config_value('pre_cmd', json_config.get_driver(),
917                                       False)
918            execute_dir = get_config_value('execute', json_config.get_driver(),
919                                           False)
920            kits = get_kit_instances(json_config,
921                                     request.config.resource_path,
922                                     request.config.testcases_path)
923            from xdevice import Scheduler
924            for kit in kits:
925                if not Scheduler.is_execute:
926                    raise ExecuteTerminate("ExecuteTerminate",
927                                           error_no="00300")
928                copy_list = kit.__setup__(request.config.device,
929                                          request=request)
930
931            self.file_name = request.root.source.test_name
932            self.set_file_name(request, request.root.source.test_name)
933            self.config.device.execute_command_with_timeout(
934                command=pre_cmd, timeout=1)
935            self.config.device.execute_command_with_timeout(
936                command="cd {}".format(execute_dir), timeout=1)
937            device_log_file = get_device_log_file(
938                request.config.report_path,
939                request.config.device.__get_serial__())
940            device_log_file_open = \
941                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
942                        os.O_APPEND, FilePermission.mode_755)
943            with os.fdopen(device_log_file_open, "a") as file_name:
944                for test_bin in copy_list:
945                    if not test_bin.endswith(".run-test"):
946                        continue
947                    if test_bin.startswith("/"):
948                        command = ".%s" % test_bin
949                    else:
950                        command = "./%s" % test_bin
951                    self._do_test_run(command, request)
952                    file_name.write(self.config.command_result)
953                file_name.flush()
954
955        except (LiteDeviceExecuteCommandError, Exception) as exception:
956            LOG.error(exception, error_no=getattr(exception, "error_no",
957                                                  "00000"))
958            self.error_message = exception
959        finally:
960            LOG.info("-------------finally-----------------")
961            # umount the dirs already mount
962            for kit in kits:
963                kit.__teardown__(request.config.device)
964            self.config.device.close()
965            report_name = "report" if request.root.source. \
966                test_name.startswith("{") else get_filename_extension(
967                    request.root.source.test_name)[0]
968            self.result = check_result_report(
969                request.config.report_path, self.result, self.error_message,
970                report_name)
971
972    def set_file_name(self, request, bin_file):
973        self.result = "%s.xml" % os.path.join(
974            request.config.report_path, "result", bin_file)
975
976    def run(self, command=None, listener=None, timeout=20):
977        parsers = get_plugin(Plugin.PARSER,
978                             ParserType.open_source_test)
979        parser_instances = []
980        for parser in parsers:
981            parser_instance = parser.__class__()
982            parser_instance.suite_name = self.file_name
983            parser_instance.test_name = command.replace("./", "")
984            parser_instance.listeners = listener
985            parser_instances.append(parser_instance)
986        self.handler = ShellHandler(parser_instances)
987        for _ in range(3):
988            result, _, error = self.config.device.execute_command_with_timeout(
989                command=command, case_type=DeviceTestType.open_source_test,
990                timeout=timeout, receiver=self.handler)
991            self.config.command_result = result
992            if "pass" in result.lower():
993                break
994        return error, result, self.handler
995
996    def _do_test_run(self, command, request):
997        listeners = request.listeners
998        for listener in listeners:
999            listener.device_sn = self.config.device.device_sn
1000        error, _, _ = self.run(command, listeners, timeout=60)
1001        if error:
1002            LOG.error(
1003                "Execute %s failed" % command, error_no="00402")
1004
1005    def __result__(self):
1006        return self.result if os.path.exists(self.result) else ""
1007
1008
1009@Plugin(type=Plugin.DRIVER, id=DeviceTestType.build_only_test)
1010class BuildOnlyTestDriver(IDriver):
1011    """
1012    BuildOnlyTest is a test that runs a native test package on given
1013    device lite device.
1014    """
1015    config = None
1016    result = ""
1017    error_message = ""
1018
1019    def __check_environment__(self, device_options):
1020        pass
1021
1022    def __check_config__(self, config):
1023        pass
1024
1025    def __execute__(self, request):
1026        self.config = request.config
1027        self.config.device = request.config.environment.devices[0]
1028        self.file_name = request.root.source.test_name
1029        self.config_file = request.root.source.config_file
1030        self.testcases_path = request.config.testcases_path
1031        file_path = self._get_log_file()
1032        result_list = self._get_result_list(file_path)
1033        if len(result_list) == 0:
1034            LOG.error(
1035                "Error: source don't exist %s." % request.root.source.
1036                source_file, error_no="00101")
1037            return
1038        total_result = ''
1039        for result in result_list:
1040            flags = os.O_RDONLY
1041            modes = stat.S_IWUSR | stat.S_IRUSR
1042            with os.fdopen(os.open(result, flags, modes), "r",
1043                           encoding="utf-8") as file_content:
1044                result = file_content.read()
1045                if not result.endswith('\n'):
1046                    result = '%s\n' % result
1047            total_result = '{}{}'.format(total_result, result)
1048        parsers = get_plugin(Plugin.PARSER, ParserType.build_only_test)
1049        parser_instances = []
1050        for parser in parsers:
1051            parser_instance = parser.__class__()
1052            parser_instance.suite_name = self.file_name
1053            parser_instance.listeners = request.listeners
1054            parser_instances.append(parser_instance)
1055        handler = ShellHandler(parser_instances)
1056        generate_report(handler, total_result)
1057
1058    @classmethod
1059    def _get_result_list(cls, file_path):
1060        result_list = list()
1061        for root_path, dirs_path, file_names in os.walk(file_path):
1062            for file_name in file_names:
1063                if file_name == "logfile":
1064                    result_list.append(os.path.join(root_path, file_name))
1065        return result_list
1066
1067    def _get_log_file(self):
1068        json_config = JsonParser(self.config_file)
1069        log_path = get_config_value('log_path', json_config.get_driver(),
1070                                    False)
1071        log_path = str(log_path.replace("/", "", 1)) if log_path.startswith(
1072            "/") else str(log_path)
1073        LOG.debug("The log path is:%s" % log_path)
1074        file_path = get_file_absolute_path(log_path,
1075                                           paths=[self.testcases_path])
1076        LOG.debug("The file path is:%s" % file_path)
1077        return file_path
1078
1079    def __result__(self):
1080        return self.result if os.path.exists(self.result) else ""
1081
1082
1083@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite)
1084class JSUnitTestLiteDriver(IDriver):
1085    """
1086    JSUnitTestDriver is a Test that runs a native test package on given device.
1087    """
1088
1089    def __init__(self):
1090        self.result = ""
1091        self.error_message = ""
1092        self.kits = []
1093        self.config = None
1094
1095    def __check_environment__(self, device_options):
1096        pass
1097
1098    def __check_config__(self, config):
1099        pass
1100
1101    def _get_driver_config(self, json_config):
1102        bundle_name = get_config_value('bundle-name',
1103                                       json_config.get_driver(), False)
1104        if not bundle_name:
1105            raise ParamError("Can't find bundle-name in config file.",
1106                             error_no="00108")
1107        else:
1108            self.config.bundle_name = bundle_name
1109
1110        ability = get_config_value('ability',
1111                                   json_config.get_driver(), False)
1112        if not ability:
1113            self.config.ability = "default"
1114        else:
1115            self.config.ability = ability
1116
1117    def __execute__(self, request):
1118        try:
1119            LOG.debug("Start execute xdevice extension JSUnit Test")
1120
1121            self.config = request.config
1122            self.config.device = request.config.environment.devices[0]
1123
1124            config_file = request.root.source.config_file
1125            suite_file = request.root.source.source_file
1126
1127            if not suite_file:
1128                raise ParamError(
1129                    "test source '%s' not exists" %
1130                    request.root.source.source_string, error_no="00101")
1131
1132            if not os.path.exists(config_file):
1133                LOG.error("Error: Test cases don't exist %s." % config_file,
1134                          error_no="00101")
1135                raise ParamError(
1136                    "Error: Test cases don't exist %s." % config_file,
1137                    error_no="00101")
1138
1139            self.file_name = os.path.basename(
1140                request.root.source.source_file.strip()).split(".")[0]
1141
1142            self.result = "%s.xml" % os.path.join(
1143                request.config.report_path, "result", self.file_name)
1144
1145            json_config = JsonParser(config_file)
1146            self.kits = get_kit_instances(json_config,
1147                                          self.config.resource_path,
1148                                          self.config.testcases_path)
1149
1150            self._get_driver_config(json_config)
1151            from xdevice import Scheduler
1152            for kit in self.kits:
1153                if not Scheduler.is_execute:
1154                    raise ExecuteTerminate("ExecuteTerminate",
1155                                           error_no="00300")
1156                if kit.__class__.__name__ == CKit.liteinstall:
1157                    kit.bundle_name = self.config.bundle_name
1158                kit.__setup__(self.config.device, request=request)
1159
1160            self._run_jsunit(request)
1161
1162        except Exception as exception:
1163            self.error_message = exception
1164        finally:
1165            report_name = "report" if request.root.source. \
1166                test_name.startswith("{") else get_filename_extension(
1167                request.root.source.test_name)[0]
1168
1169            self.result = check_result_report(
1170                request.config.report_path, self.result, self.error_message,
1171                report_name)
1172
1173            for kit in self.kits:
1174                kit.__teardown__(self.config.device)
1175            self.config.device.close()
1176
1177    def _run_jsunit(self, request):
1178        parser_instances = []
1179        parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite)
1180        for parser in parsers:
1181            parser_instance = parser.__class__()
1182            parser_instance.suites_name = self.file_name
1183            parser_instance.listeners = request.listeners
1184            parser_instances.append(parser_instance)
1185        handler = ShellHandler(parser_instances)
1186
1187        command = "./bin/aa start -p %s -n %s" % \
1188                  (self.config.bundle_name, self.config.ability)
1189        result, _, error = self.config.device.execute_command_with_timeout(
1190            command=command, timeout=300, receiver=handler)
1191        device_log_file = get_device_log_file(request.config.report_path,
1192                                              request.config.device.
1193                                              __get_serial__())
1194        device_log_file_open =\
1195            os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
1196                    FilePermission.mode_755)
1197        with os.fdopen(device_log_file_open, "a") as file_name:
1198            file_name.write("{}{}".format(
1199                "\n".join(result.split("\n")[0:-1]), "\n"))
1200            file_name.flush()
1201
1202    def __result__(self):
1203        return self.result if os.path.exists(self.result) else ""
1204