• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import glob
22import time
23import stat
24
25from xdevice import UserConfigManager
26from xdevice import ConfigConst
27from xdevice import ExecuteTerminate
28from xdevice import ParamError
29
30from xdevice import TestDescription
31from xdevice import IDriver
32from xdevice import Plugin
33from xdevice import get_plugin
34from xdevice import platform_logger
35from xdevice import DataHelper
36from xdevice import JsonParser
37from xdevice import get_config_value
38from xdevice import get_kit_instances
39from xdevice import check_result_report
40from xdevice import get_device_log_file
41from xdevice import get_filename_extension
42from xdevice import get_file_absolute_path
43from xdevice import get_test_component_version
44from xdevice import SuiteReporter
45from xdevice import ShellHandler
46from xdevice import FilePermission
47from xdevice import DeviceTestType
48from xdevice import GTestConst
49from xdevice import DeviceLabelType
50from ohos.constants import ComType
51from ohos.constants import ParserType
52from ohos.constants import DeviceLiteKernel
53from ohos.constants import CKit
54from ohos.exception import LiteDeviceError
55from ohos.exception import LiteDeviceExecuteCommandError
56from ohos.executor.listener import CollectingLiteGTestListener
57from ohos.testkit.kit_lite import DeployKit
58from ohos.testkit.kit_lite import DeployToolKit
59from ohos.environment.dmlib_lite import generate_report
60
61__all__ = ["CppTestDriver", "CTestDriver", "init_remote_server"]
62LOG = platform_logger("DriversLite")
63FAILED_RUN_TEST_ATTEMPTS = 2
64CPP_TEST_MOUNT_STOP_SIGN = "not mount properly, Test Stop"
65CPP_TEST_NFS_SIGN = "execve: I/O error"
66
67
68def get_nfs_server(request):
69    config_manager = UserConfigManager(
70        config_file=request.get(ConfigConst.configfile, ""),
71        env=request.get(ConfigConst.test_environment, ""))
72    remote_info = config_manager.get_user_config("testcases/server",
73                                                 filter_name="NfsServer")
74    if not remote_info:
75        err_msg = "The name of remote nfs server does not match"
76        LOG.error(err_msg, error_no="00403")
77        raise ParamError(err_msg, error_no="00403")
78    return remote_info
79
80
81def init_remote_server(lite_instance, request=None):
82    config_manager = UserConfigManager(
83        config_file=request.get(ConfigConst.configfile, ""),
84        env=request.get(ConfigConst.test_environment, ""))
85    linux_dict = config_manager.get_user_config("testcases/server")
86
87    if linux_dict:
88        setattr(lite_instance, "linux_host", linux_dict.get("ip"))
89        setattr(lite_instance, "linux_port", linux_dict.get("port"))
90        setattr(lite_instance, "linux_directory", linux_dict.get("dir"))
91
92    else:
93        error_message = "nfs server does not exist, please " \
94                        "check user_config.xml"
95        raise ParamError(error_message, error_no="00108")
96
97
98def get_testcases(testcases_list):
99    cases_list = []
100    for test in testcases_list:
101        test_item = test.split("#")
102        if len(test_item) == 1:
103            cases_list.append(test)
104        elif len(test_item) == 2:
105            cases_list.append(test_item[-1])
106    return cases_list
107
108
109def sort_by_length(file_name):
110    return len(file_name)
111
112
113@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test_lite)
114class CppTestDriver(IDriver):
115    """
116    CppTest is a test that runs a native test package on given lite device.
117    """
118    config = None
119    result = ""
120    error_message = ""
121
122    def __init__(self):
123        self.rerun = True
124        self.file_name = ""
125
126    def __check_environment__(self, device_options):
127        if len(device_options) != 1 or \
128                device_options[0].label != DeviceLabelType.ipcamera:
129            self.error_message = "check environment failed"
130            return False
131        return True
132
133    def __check_config__(self, config=None):
134        pass
135
136    def __execute__(self, request):
137        kits = []
138        device_log_file = get_device_log_file(
139            request.config.report_path,
140            request.get_devices()[0].__get_serial__())
141        try:
142            self.config = request.config
143            self.init_cpp_config()
144            self.config.device = request.config.environment.devices[0]
145            init_remote_server(self, request=request)
146            config_file = request.root.source.config_file
147            json_config = JsonParser(config_file)
148            self._get_driver_config(json_config)
149
150            bin_file = get_config_value('execute', json_config.get_driver(),
151                                        False)
152            kits = get_kit_instances(json_config,
153                                     request.config.resource_path,
154                                     request.config.testcases_path)
155            from xdevice import Scheduler
156            for kit in kits:
157                if not Scheduler.is_execute:
158                    raise ExecuteTerminate("ExecuteTerminate",
159                                           error_no="00300")
160                kit.__setup__(request.config.device, request=request)
161
162            command = self._get_execute_command(bin_file)
163
164            self.set_file_name(request, command)
165
166            if self.config.xml_output:
167                self.delete_device_xml(request, self.config.device_xml_path)
168            if os.path.exists(self.result):
169                os.remove(self.result)
170            if request.config.testargs.get("dry_run"):
171                self.config.dry_run = request.config.testargs.get(
172                    "dry_run")[0].lower()
173                self.dry_run(command, request.listeners)
174            else:
175                self.run_cpp_test(command, request)
176                self.generate_device_xml(request, self.execute_bin)
177
178        except (LiteDeviceError, Exception) as exception:
179            LOG.exception(exception, exc_info=False)
180            self.error_message = exception
181        finally:
182            device_log_file_open = os.open(device_log_file, os.O_WRONLY |
183                                           os.O_CREAT | os.O_APPEND,
184                                           FilePermission.mode_755)
185            with os.fdopen(device_log_file_open, "a") as file_name:
186                file_name.write(self.config.command_result)
187                file_name.flush()
188            LOG.info("-------------finally-----------------")
189            self._after_command(kits, request)
190        self.result = check_result_report(
191            request.config.report_path, self.result, self.error_message, request=request)
192
193    def _get_execute_command(self, bin_file):
194        if self.config.device.get("device_kernel") == \
195                DeviceLiteKernel.linux_kernel:
196            execute_dir = "/storage" + "/".join(bin_file.split("/")[0:-1])
197        else:
198            execute_dir = "/".join(bin_file.split("/")[0:-1])
199        self.execute_bin = bin_file.split("/")[-1]
200
201        self.config.device.execute_command_with_timeout(
202            command="cd {}".format(execute_dir), timeout=1)
203        self.config.execute_bin_path = execute_dir
204
205        if self.execute_bin.startswith("/"):
206            command = ".%s" % self.execute_bin
207        else:
208            command = "./%s" % self.execute_bin
209
210        report_path = "/%s/%s/" % ("reports", self.execute_bin.split(".")[0])
211        self.config.device_xml_path = (self.linux_directory + report_path). \
212            replace("//", "/")
213        self.config.device_report_path = execute_dir + report_path
214
215        return command
216
217    def _get_driver_config(self, json_config):
218        xml_output = get_config_value('xml-output',
219                                      json_config.get_driver(), False)
220
221        if isinstance(xml_output, bool):
222            self.config.xml_output = xml_output
223        elif str(xml_output).lower() == "false":
224            self.config.xml_output = False
225        else:
226            self.config.xml_output = True
227
228        rerun = get_config_value('rerun', json_config.get_driver(), False)
229        if isinstance(rerun, bool):
230            self.rerun = rerun
231        elif str(rerun).lower() == "false":
232            self.rerun = False
233        else:
234            self.rerun = True
235
236        timeout_config = get_config_value('timeout',
237                                          json_config.get_driver(), False)
238        if timeout_config:
239            self.config.timeout = int(timeout_config) // 1000
240        else:
241            self.config.timeout = 900
242
243    def _after_command(self, kits, request):
244        if self.config.device.get("device_kernel") == \
245                DeviceLiteKernel.linux_kernel:
246            self.config.device.execute_command_with_timeout(
247                command="cd /storage", timeout=1)
248        else:
249            self.config.device.execute_command_with_timeout(
250                command="cd /", timeout=1)
251        for kit in kits:
252            kit.__teardown__(request.config.device)
253        self.config.device.close()
254        self.delete_device_xml(request, self.linux_directory)
255
256        report_name = "report" if request.root.source. \
257            test_name.startswith("{") else get_filename_extension(
258            request.root.source.test_name)[0]
259        if not self.config.dry_run:
260            self.result = check_result_report(
261                request.config.report_path, self.result, self.error_message,
262                report_name)
263
264    def generate_device_xml(self, request, execute_bin):
265        if self.config.xml_output:
266            self.download_nfs_xml(request, self.config.device_xml_path)
267            self.merge_xml(execute_bin)
268
269    def dry_run(self, request, command, listener=None):
270        if self.config.xml_output:
271            collect_test_command = "%s --gtest_output=xml:%s " \
272                                   "--gtest_list_tests" % \
273                                   (command, self.config.device_report_path)
274            result, _, _ = self.config.device.execute_command_with_timeout(
275                command=collect_test_command,
276                case_type=DeviceTestType.cpp_test_lite,
277                timeout=15, receiver=None)
278            if CPP_TEST_MOUNT_STOP_SIGN in result:
279                tests = []
280                return tests
281            tests = self.read_nfs_xml(request, self.config.device_xml_path)
282            self.delete_device_xml(request, self.config.device_xml_path)
283            return tests
284
285        else:
286            parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_list_lite)
287            parser_instances = []
288            for parser in parsers:
289                parser_instance = parser.__class__()
290                parser_instance.suites_name = os.path.basename(self.result)
291                if listener:
292                    parser_instance.listeners = listener
293                parser_instances.append(parser_instance)
294            handler = ShellHandler(parser_instances)
295
296            collect_test_command = "%s --gtest_list_tests" % command
297            result, _, _ = self.config.device.execute_command_with_timeout(
298                command=collect_test_command,
299                case_type=DeviceTestType.cpp_test_lite,
300                timeout=15, receiver=handler)
301            self.config.command_result = "{}{}".format(
302                self.config.command_result, result)
303            if parser_instances[0].tests and \
304                    len(parser_instances[0].tests) > 0:
305                SuiteReporter.set_suite_list([item.test_name for item in
306                                              parser_instances[0].tests])
307            else:
308                SuiteReporter.set_suite_list([])
309            tests = parser_instances[0].tests
310        if not tests:
311            LOG.error("Collect test failed!", error_no="00402")
312        return parser_instances[0].tests
313
314    def run_cpp_test(self, command, request):
315        if request.config.testargs.get("test"):
316            testcases_list = get_testcases(
317                request.config.testargs.get("test"))
318            for test in testcases_list:
319                command_case = "{} --gtest_filter=*{}".format(
320                    command, test)
321
322                if not self.config.xml_output:
323                    self.run(command_case, request.listeners, timeout=15)
324                else:
325                    command_case = "{} --gtest_output=xml:{}".format(
326                        command_case, self.config.device_report_path)
327                    self.run(command_case, None, timeout=15)
328        else:
329            self._do_test_run(command, request)
330
331    def init_cpp_config(self):
332        setattr(self.config, "command_result", "")
333        setattr(self.config, "device_xml_path", "")
334        setattr(self.config, "dry_run", False)
335
336    def merge_xml(self, execute_bin):
337        report_path = os.path.join(self.config.report_path, "result")
338        summary_result = DataHelper.get_summary_result(
339            report_path, self.result, key=sort_by_length,
340            file_prefix=execute_bin)
341        if summary_result:
342            SuiteReporter.append_report_result((
343                os.path.join(report_path, "%s.xml" % execute_bin),
344                DataHelper.to_string(summary_result)))
345        else:
346            self.error_message = "The test case did not generate XML"
347        for xml_file in os.listdir(os.path.split(self.result)[0]):
348            if not xml_file.startswith(execute_bin):
349                continue
350            if xml_file != os.path.split(self.result)[1]:
351                os.remove(os.path.join(os.path.split(
352                    self.result)[0], xml_file))
353
354    def set_file_name(self, request, command):
355        self.file_name = command.split(" ")[0].split("/")[-1].split(".")[0]
356        self.result = "%s.xml" % os.path.join(request.config.report_path,
357                                              "result", self.file_name)
358
359    def run(self, command=None, listener=None, timeout=None):
360        if not timeout:
361            timeout = self.config.timeout
362        parser_instances = []
363        if listener:
364            parsers = get_plugin(Plugin.PARSER, ParserType.cpp_test_lite)
365            for parser in parsers:
366                parser_instance = parser.__class__()
367                parser_instance.suite_name = self.file_name
368                parser_instance.listeners = listener
369                parser_instances.append(parser_instance)
370            handler = ShellHandler(parser_instances)
371        else:
372            handler = None
373        result, _, error = self.config.device.execute_command_with_timeout(
374            command=command, case_type=DeviceTestType.cpp_test_lite,
375            timeout=timeout, receiver=handler)
376        self.config.command_result += result
377        if result.count(CPP_TEST_NFS_SIGN) >= 1:
378            _, _, error = self.config.device.execute_command_with_timeout(
379                command="ping %s" % self.linux_host,
380                case_type=DeviceTestType.cpp_test_lite,
381                timeout=5)
382        device = self.config.device
383        for parser_instance in parser_instances:
384            if hasattr(parser_instance, "product_info"):
385                product_info = parser_instance.product_info
386                device.update_device_props(product_info)
387        return error, result, handler
388
389    def _do_test_run(self, command, request):
390        test_to_run = self._collect_test_to_run(request, command)
391        self._run_with_rerun(command, request, test_to_run)
392
393    def _run_with_rerun(self, command, request, expected_tests):
394        if self.config.xml_output:
395            self.run("{} --gtest_output=xml:{}".format(
396                command, self.config.device_report_path))
397            time.sleep(5)
398            test_rerun = True
399            if self.check_xml_exist(self.execute_bin + ".xml"):
400                test_rerun = False
401            test_run = self.read_nfs_xml(request,
402                                         self.config.device_xml_path,
403                                         test_rerun)
404            if len(test_run) < len(expected_tests):
405                expected_tests = TestDescription.remove_test(expected_tests,
406                                                             test_run)
407                self._rerun_tests(command, expected_tests, None)
408        else:
409            test_tracker = CollectingLiteGTestListener()
410            listener = request.listeners
411            listener_copy = listener.copy()
412            listener_copy.append(test_tracker)
413            self.run(command, listener_copy)
414            test_run = test_tracker.get_current_run_results()
415            if len(test_run) != len(expected_tests):
416                expected_tests = TestDescription.remove_test(expected_tests,
417                                                             test_run)
418                self._rerun_tests(command, expected_tests, listener)
419
420    def _rerun_tests(self, command, expected_tests, listener):
421        if not expected_tests:
422            LOG.debug("No tests to re-run, all tests executed at least once.")
423        for test in expected_tests:
424            self._re_run(command, test, listener)
425
426    def _re_run(self, command, test, listener):
427        if self.config.xml_output:
428            _, _, handler = self.run("{} {}=*{} --gtest_output=xml:{}".format(
429                command, GTestConst.exec_para_filter, test.test_name,
430                self.config.device_report_path),
431                listener, timeout=15)
432        else:
433            handler = None
434            for _ in range(FAILED_RUN_TEST_ATTEMPTS):
435                try:
436                    listener_copy = listener.copy()
437                    test_tracker = CollectingLiteGTestListener()
438                    listener_copy.append(test_tracker)
439                    _, _, handler = self.run("{} {}=*{}".format(
440                        command, GTestConst.exec_para_filter, test.test_name),
441                        listener_copy, timeout=15)
442                    if len(test_tracker.get_current_run_results()):
443                        return
444                except LiteDeviceError as _:
445                    LOG.debug("Exception: ShellCommandUnresponsiveException")
446            handler.parsers[0].mark_test_as_failed(test)
447
448    def _collect_test_to_run(self, request, command):
449        if self.rerun:
450            tests = self.dry_run(request, command)
451            return tests
452        return []
453
454    def download_nfs_xml(self, request, report_path):
455        remote_nfs = get_nfs_server(request)
456        if not remote_nfs:
457            err_msg = "The name of remote device {} does not match". \
458                format(self.remote)
459            LOG.error(err_msg, error_no="00403")
460            raise TypeError(err_msg)
461        LOG.info("Trying to pull remote server: {}:{} report files to local "
462                 "in dir {}".format
463                 (remote_nfs.get("ip"), remote_nfs.get("port"),
464                  os.path.dirname(self.result)))
465        result_dir = os.path.join(request.config.report_path, "result")
466        os.makedirs(result_dir, exist_ok=True)
467        try:
468            if remote_nfs["remote"] == "true":
469                import paramiko
470                client = paramiko.Transport((remote_nfs.get("ip"),
471                                             int(remote_nfs.get("port"))))
472                client.connect(username=remote_nfs.get("username"),
473                               password=remote_nfs.get("password"))
474                sftp = paramiko.SFTPClient.from_transport(client)
475                files = sftp.listdir(report_path)
476
477                for report_xml in files:
478                    if report_xml.endswith(".xml"):
479                        filepath = report_path + report_xml
480                        try:
481                            sftp.get(remotepath=filepath,
482                                     localpath=os.path.join(os.path.split(
483                                         self.result)[0], report_xml))
484                        except IOError as error:
485                            LOG.error(error, error_no="00404")
486                client.close()
487            else:
488                if os.path.isdir(report_path):
489                    for report_xml in os.listdir(report_path):
490                        if report_xml.endswith(".xml"):
491                            filepath = report_path + report_xml
492                            shutil.copy(filepath,
493                                        os.path.join(os.path.split(
494                                            self.result)[0], report_xml))
495        except (FileNotFoundError, IOError) as error:
496            LOG.error("Download xml failed %s" % error, error_no="00403")
497
498    def check_xml_exist(self, xml_file, timeout=60):
499        ls_command = "ls %s" % self.config.device_report_path
500        start_time = time.time()
501        while time.time() - start_time < timeout:
502            result, _, _ = self.config.device.execute_command_with_timeout(
503                command=ls_command, case_type=DeviceTestType.cpp_test_lite,
504                timeout=5, receiver=None)
505            if xml_file in result:
506                return True
507            time.sleep(5)
508            if (self.execute_bin + "_1.xml") in result:
509                return False
510        return False
511
512    def read_nfs_xml(self, request, report_path, is_true=False):
513        remote_nfs = get_nfs_server(request)
514        if not remote_nfs:
515            err_msg = "The name of remote device {} does not match". \
516                format(self.remote)
517            LOG.error(err_msg, error_no="00403")
518            raise TypeError(err_msg)
519        tests = []
520        execute_bin_xml = (self.execute_bin + "_1.xml") if is_true else (
521                self.execute_bin + ".xml")
522        LOG.debug("run into :{}".format(is_true))
523        file_path = os.path.join(report_path, execute_bin_xml)
524        if not self.check_xml_exist(execute_bin_xml):
525            return tests
526
527        from xml.etree import ElementTree
528        try:
529            if remote_nfs["remote"] == "true":
530                import paramiko
531                client = paramiko.SSHClient()
532                client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
533                client.connect(hostname=remote_nfs.get("ip"),
534                               port=int(remote_nfs.get("port")),
535                               username=remote_nfs.get("username"),
536                               password=remote_nfs.get("password"))
537                sftp_client = client.open_sftp()
538                remote_file = sftp_client.open(file_path)
539                try:
540                    result = remote_file.read().decode()
541                    suites_element = ElementTree.fromstring(result)
542                    for suite_element in suites_element:
543                        suite_name = suite_element.get("name", "")
544                        for case in suite_element:
545                            case_name = case.get("name")
546                            test = TestDescription(suite_name, case_name)
547                            if test not in tests:
548                                tests.append(test)
549                finally:
550                    remote_file.close()
551                client.close()
552            else:
553                if os.path.isdir(report_path):
554                    flags = os.O_RDONLY
555                    modes = stat.S_IWUSR | stat.S_IRUSR
556                    with os.fdopen(os.open(file_path, flags, modes),
557                                   "r") as test_file:
558                        result = test_file.read()
559                        suites_element = ElementTree.fromstring(result)
560                        for suite_element in suites_element:
561                            suite_name = suite_element.get("name", "")
562                            for case in suite_element:
563                                case_name = case.get("name")
564                                test = TestDescription(suite_name, case_name)
565                                if test not in tests:
566                                    tests.append(test)
567        except (FileNotFoundError, IOError) as error:
568            LOG.error("Download xml failed %s" % error, error_no="00403")
569        except SyntaxError as error:
570            LOG.error("Parse xml failed %s" % error, error_no="00404")
571        return tests
572
573    def delete_device_xml(self, request, report_path):
574        remote_nfs = get_nfs_server(request)
575        if not remote_nfs:
576            err_msg = "The name of remote device {} does not match". \
577                format(self.remote)
578            LOG.error(err_msg, error_no="00403")
579            raise TypeError(err_msg)
580        LOG.info("Delete xml directory {} from remote server: {}"
581                 "".format
582                 (report_path, remote_nfs.get("ip")))
583        if remote_nfs["remote"] == "true":
584            import paramiko
585            client = paramiko.Transport((remote_nfs.get("ip"),
586                                         int(remote_nfs.get("port"))))
587            client.connect(username=remote_nfs.get("username"),
588                           password=remote_nfs.get("password"))
589            sftp = paramiko.SFTPClient.from_transport(client)
590            try:
591                sftp.stat(report_path)
592                files = sftp.listdir(report_path)
593                for report_xml in files:
594                    if report_xml.endswith(".xml"):
595                        filepath = "{}{}".format(report_path, report_xml)
596                        try:
597                            sftp.remove(filepath)
598                            time.sleep(0.5)
599                        except IOError as _:
600                            pass
601            except FileNotFoundError as _:
602                pass
603            client.close()
604        else:
605            for report_xml in glob.glob(os.path.join(report_path, '*.xml')):
606                try:
607                    os.remove(report_xml)
608                except Exception as exception:
609                    LOG.error(
610                        "remove {} Failed:{}".format(report_xml, exception))
611                    pass
612
613    def __result__(self):
614        return self.result if os.path.exists(self.result) else ""
615
616
617@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite)
618class CTestDriver(IDriver):
619    """
620    CTest is a test that runs a native test package on given lite device.
621    """
622    config = None
623    result = ""
624    error_message = ""
625    version_cmd = "AT+CSV"
626
627    def __init__(self):
628        self.file_name = ""
629        self.run_third = False
630        self.kit_type = None
631        self.auto_deploy = None
632
633    def __check_environment__(self, device_options):
634        if len(device_options) != 1 or \
635                device_options[0].label != DeviceLabelType.wifiiot:
636            self.error_message = "check environment failed"
637            return False
638        return True
639
640    def __check_config__(self, config=None):
641        del config
642        self.config = None
643
644    def __execute__(self, request):
645        from xdevice import Variables
646        try:
647            self.config = request.config
648            self.config.device = request.config.environment.devices[0]
649            current_dir = request.config.resource_path if \
650                request.config.resource_path else Variables.exec_dir
651            if request.root.source.source_file.strip():
652                source = os.path.join(current_dir,
653                                      request.root.source.source_file.strip())
654                self.file_name = os.path.basename(
655                    request.root.source.source_file.strip()).split(".")[0]
656            else:
657                source = request.root.source.source_string.strip()
658            json_config = JsonParser(source)
659            kit_instances = get_kit_instances(json_config,
660                                              request.config.resource_path,
661                                              request.config.testcases_path)
662            for (_, kit_info) in zip(kit_instances, json_config.get_kits()):
663                self.auto_deploy = kit_info.get("auto_deploy", "")
664                self.kit_type = kit_info.get("type", "")
665                if self.kit_type == CKit.deploytool:
666                    self.run_third = True
667            LOG.info("Kit type:{}".format(self.kit_type))
668            LOG.info("Auto deploy:{}".format(self.auto_deploy))
669            LOG.info("Run third:{}".format(self.run_third))
670            if self.run_third:
671                LOG.info("Run the third-party vendor part")
672                if self.auto_deploy in ["False", "flase"]:
673                    self._run_ctest_third_party(source=source, request=request)
674                else:
675                    self._run_ctest_upgrade_party(source=source, request=request)
676            else:
677                LOG.debug("Run ctest")
678                self._run_ctest(source=source, request=request)
679
680        except (LiteDeviceExecuteCommandError, Exception) as exception:
681            LOG.error(exception, error_no=getattr(exception, "error_no",
682                                                  "00000"))
683            self.error_message = exception
684        finally:
685            report_name = "report" if request.root.source. \
686                test_name.startswith("{") else get_filename_extension(
687                request.root.source.test_name)[0]
688            self.result = check_result_report(
689                request.config.report_path, self.result, self.error_message,
690                report_name)
691
692    def _run_ctest(self, source=None, request=None, timeout=90):
693        parser_instances = []
694        parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
695        try:
696            if not source:
697                LOG.error("Error: source don't exist %s." % source,
698                          error_no="00101")
699                return
700
701            version = get_test_component_version(self.config)
702
703            for parser in parsers:
704                parser_instance = parser.__class__()
705                parser_instance.suites_name = self.file_name
706                parser_instance.product_info.setdefault("Version", version)
707                parser_instance.listeners = request.listeners
708                parser_instances.append(parser_instance)
709            handler = ShellHandler(parser_instances)
710
711            reset_cmd = self._reset_device(request, source)
712            self.result = "%s.xml" % os.path.join(
713                request.config.report_path, "result", self.file_name)
714            self.config.device.device.com_dict.get(
715                ComType.deploy_com).connect()
716            result, _, error = self.config.device.device. \
717                execute_command_with_timeout(
718                    command=reset_cmd, case_type=DeviceTestType.ctest_lite,
719                    key=ComType.deploy_com, timeout=timeout, receiver=handler)
720            device_log_file = get_device_log_file(
721                request.config.report_path,
722                request.config.device.__get_serial__())
723            device_log_file_open = \
724                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
725                        os.O_APPEND, FilePermission.mode_755)
726            with os.fdopen(device_log_file_open, "a") as file_name:
727                file_name.write("{}{}".format(
728                    "\n".join(result.split("\n")[0:-1]), "\n"))
729                file_name.flush()
730        finally:
731            device = self.config.device
732            device.device.com_dict.get(ComType.deploy_com).close()
733            for parser_instance in parser_instances:
734                if hasattr(parser_instance, "product_info"):
735                    product_info = parser_instance.product_info
736                    device.update_device_props(product_info)
737
738    def _run_ctest_third_party(self, source=None, request=None, timeout=5):
739        parser_instances = []
740        parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
741        try:
742            if not source:
743                LOG.error("Error: source don't exist %s." % source,
744                          error_no="00101")
745                return
746
747            version = get_test_component_version(self.config)
748
749            for parser in parsers:
750                parser_instance = parser.__class__()
751                parser_instance.suites_name = self.file_name
752                parser_instance.product_info.setdefault("Version", version)
753                parser_instance.listeners = request.listeners
754                parser_instances.append(parser_instance)
755            handler = ShellHandler(parser_instances)
756
757            while True:
758                input_burning = input("Please enter 'y' or 'n' "
759                                      "after the burning  is complete,"
760                                      "enter 'quit' to exit:")
761                if input_burning.lower().strip() in ["y", "yes"]:
762                    LOG.info("Burning succeeded.")
763                    break
764                elif input_burning.lower().strip() in ["n", "no"]:
765                    LOG.info("Burning failed.")
766                elif input_burning.lower().strip() == "quit":
767                    break
768                else:
769                    LOG.info({"The input parameter is incorrect,please"
770                              " enter 'y' or 'n' after the burning is "
771                              "complete,enter 'quit' to exit."})
772            LOG.info("Please press the reset button on the device ")
773            time.sleep(5)
774            self.result = "%s.xml" % os.path.join(
775                request.config.report_path, "result", self.file_name)
776            self.config.device.device.com_dict.get(
777                ComType.deploy_com).connect()
778
779            LOG.debug("Device com:{}".format(self.config.device.device))
780            result, _, error = self.config.device.device. \
781                execute_command_with_timeout(
782                command=[], case_type=DeviceTestType.ctest_lite,
783                key=ComType.deploy_com, timeout=timeout, receiver=handler)
784            device_log_file = get_device_log_file(
785                request.config.report_path,
786                request.config.device.__get_serial__())
787            device_log_file_open = \
788                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
789                        os.O_APPEND, FilePermission.mode_755)
790            with os.fdopen(device_log_file_open, "a") as file_name:
791                file_name.write("{}{}".format(
792                    "\n".join(result.split("\n")[0:-1]), "\n"))
793                file_name.flush()
794        finally:
795            self.config.device.device.com_dict.get(
796                ComType.deploy_com).close()
797
798    def _run_ctest_upgrade_party(self, source=None, request=None, timeout=90):
799        parser_instances = []
800        parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
801        try:
802            if not source:
803                LOG.error("Error: source don't exist %s." % source,
804                          error_no="00101")
805                return
806
807            version = get_test_component_version(self.config)
808
809            for parser in parsers:
810                parser_instance = parser.__class__()
811                parser_instance.suites_name = self.file_name
812                parser_instance.product_info.setdefault("Version", version)
813                parser_instance.listeners = request.listeners
814                parser_instances.append(parser_instance)
815            handler = ShellHandler(parser_instances)
816            result = self._reset_third_device(request, source)
817            self.result = "%s.xml" % os.path.join(
818                request.config.report_path, "result", self.file_name)
819            if isinstance(result, list):
820                self.config.device.device.com_dict.get(
821                    ComType.deploy_com).connect()
822                LOG.debug("Device com:{}".format(self.config.device.device))
823                result, _, error = self.config.device.device. \
824                    execute_command_with_timeout(
825                    command=request, case_type=DeviceTestType.ctest_lite,
826                    key=ComType.deploy_com, timeout=timeout, receiver=handler)
827            else:
828                handler.__read__(result)
829                handler.__done__()
830            device_log_file = get_device_log_file(
831                request.config.report_path,
832                request.config.device.__get_serial__())
833            device_log_file_open = \
834                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
835                        os.O_APPEND, FilePermission.mode_755)
836            with os.fdopen(device_log_file_open, "a") as file_name:
837                file_name.write("{}{}".format(
838                    "\n".join(result.split("\n")[0:-1]), "\n"))
839                file_name.flush()
840        finally:
841            self.config.device.device.com_dict.get(
842                ComType.deploy_com).close()
843
844    def _reset_device(self, request, source):
845        json_config = JsonParser(source)
846        reset_cmd = []
847        kit_instances = get_kit_instances(json_config,
848                                          request.config.resource_path,
849                                          request.config.testcases_path)
850        from xdevice import Scheduler
851        for (kit_instance, kit_info) in zip(kit_instances,
852                                            json_config.get_kits()):
853            if not isinstance(kit_instance, DeployKit):
854                continue
855            if not self.file_name:
856                self.file_name = get_config_value(
857                    'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
858            reset_cmd = kit_instance.burn_command
859            if not Scheduler.is_execute:
860                raise ExecuteTerminate("ExecuteTerminate",
861                                       error_no="00300")
862            kit_instance.__setup__(
863                self.config.device)
864        reset_cmd = [int(item, 16) for item in reset_cmd]
865        return reset_cmd
866
867    def _reset_third_device(self, request, source):
868        json_config = JsonParser(source)
869        reset_cmd = []
870        kit_instances = get_kit_instances(json_config,
871                                          request.config.resource_path,
872                                          request.config.testcases_path)
873        from xdevice import Scheduler
874        for (kit_instance, kit_info) in zip(kit_instances,
875                                            json_config.get_kits()):
876            if not isinstance(kit_instance, DeployToolKit):
877                continue
878            if not self.file_name:
879                self.file_name = get_config_value(
880                    'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
881            if not Scheduler.is_execute:
882                raise ExecuteTerminate("ExecuteTerminate",
883                                       error_no="00300")
884            reset_cmd = kit_instance.__setup__(
885                self.config.device)
886        return reset_cmd
887
888    def __result__(self):
889        return self.result if os.path.exists(self.result) else ""
890
891
892@Plugin(type=Plugin.DRIVER, id=DeviceTestType.open_source_test)
893class OpenSourceTestDriver(IDriver):
894    """
895    OpenSourceTest is a test that runs a native test package on given
896    device lite device.
897    """
898    config = None
899    result = ""
900    error_message = ""
901    has_param = False
902
903    def __init__(self):
904        self.rerun = True
905        self.file_name = ""
906        self.handler = None
907
908    def __check_environment__(self, device_options):
909        if len(device_options) != 1 or \
910                device_options[0].label != DeviceLabelType.ipcamera:
911            self.error_message = "check environment failed"
912            return False
913        return True
914
915    def __check_config__(self, config=None):
916        pass
917
918    def __execute__(self, request):
919        kits = []
920        try:
921            self.config = request.config
922            setattr(self.config, "command_result", "")
923            self.config.device = request.config.environment.devices[0]
924            init_remote_server(self, request)
925            config_file = request.root.source.config_file
926            json_config = JsonParser(config_file)
927            pre_cmd = get_config_value('pre_cmd', json_config.get_driver(),
928                                       False)
929            execute_dir = get_config_value('execute', json_config.get_driver(),
930                                           False)
931            kits = get_kit_instances(json_config,
932                                     request.config.resource_path,
933                                     request.config.testcases_path)
934            from xdevice import Scheduler
935            for kit in kits:
936                if not Scheduler.is_execute:
937                    raise ExecuteTerminate("ExecuteTerminate",
938                                           error_no="00300")
939                copy_list = kit.__setup__(request.config.device,
940                                          request=request)
941
942            self.file_name = request.root.source.test_name
943            self.set_file_name(request, request.root.source.test_name)
944            self.config.device.execute_command_with_timeout(
945                command=pre_cmd, timeout=1)
946            self.config.device.execute_command_with_timeout(
947                command="cd {}".format(execute_dir), timeout=1)
948            device_log_file = get_device_log_file(
949                request.config.report_path,
950                request.config.device.__get_serial__())
951            device_log_file_open = \
952                os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
953                        os.O_APPEND, FilePermission.mode_755)
954            with os.fdopen(device_log_file_open, "a") as file_name:
955                for test_bin in copy_list:
956                    if not test_bin.endswith(".run-test"):
957                        continue
958                    if test_bin.startswith("/"):
959                        command = ".%s" % test_bin
960                    else:
961                        command = "./%s" % test_bin
962                    self._do_test_run(command, request)
963                    file_name.write(self.config.command_result)
964                file_name.flush()
965
966        except (LiteDeviceExecuteCommandError, Exception) as exception:
967            LOG.error(exception, error_no=getattr(exception, "error_no",
968                                                  "00000"))
969            self.error_message = exception
970        finally:
971            LOG.info("-------------finally-----------------")
972            # umount the dirs already mount
973            for kit in kits:
974                kit.__teardown__(request.config.device)
975            self.config.device.close()
976            report_name = "report" if request.root.source. \
977                test_name.startswith("{") else get_filename_extension(
978                    request.root.source.test_name)[0]
979            self.result = check_result_report(
980                request.config.report_path, self.result, self.error_message,
981                report_name)
982
983    def set_file_name(self, request, bin_file):
984        self.result = "%s.xml" % os.path.join(
985            request.config.report_path, "result", bin_file)
986
987    def run(self, command=None, listener=None, timeout=20):
988        parsers = get_plugin(Plugin.PARSER,
989                             ParserType.open_source_test)
990        parser_instances = []
991        for parser in parsers:
992            parser_instance = parser.__class__()
993            parser_instance.suite_name = self.file_name
994            parser_instance.test_name = command.replace("./", "")
995            parser_instance.listeners = listener
996            parser_instances.append(parser_instance)
997        self.handler = ShellHandler(parser_instances)
998        for _ in range(3):
999            result, _, error = self.config.device.execute_command_with_timeout(
1000                command=command, case_type=DeviceTestType.open_source_test,
1001                timeout=timeout, receiver=self.handler)
1002            self.config.command_result = result
1003            if "pass" in result.lower():
1004                break
1005        return error, result, self.handler
1006
1007    def _do_test_run(self, command, request):
1008        listeners = request.listeners
1009        for listener in listeners:
1010            listener.device_sn = self.config.device.device_sn
1011        error, _, _ = self.run(command, listeners, timeout=60)
1012        if error:
1013            LOG.error(
1014                "Execute %s failed" % command, error_no="00402")
1015
1016    def __result__(self):
1017        return self.result if os.path.exists(self.result) else ""
1018
1019
1020@Plugin(type=Plugin.DRIVER, id=DeviceTestType.build_only_test)
1021class BuildOnlyTestDriver(IDriver):
1022    """
1023    BuildOnlyTest is a test that runs a native test package on given
1024    device lite device.
1025    """
1026    config = None
1027    result = ""
1028    error_message = ""
1029
1030    def __check_environment__(self, device_options):
1031        pass
1032
1033    def __check_config__(self, config):
1034        pass
1035
1036    def __execute__(self, request):
1037        self.config = request.config
1038        self.config.device = request.config.environment.devices[0]
1039        self.file_name = request.root.source.test_name
1040        self.config_file = request.root.source.config_file
1041        self.testcases_path = request.config.testcases_path
1042        file_path = self._get_log_file()
1043        result_list = self._get_result_list(file_path)
1044        if len(result_list) == 0:
1045            LOG.error(
1046                "Error: source don't exist %s." % request.root.source.
1047                source_file, error_no="00101")
1048            return
1049        total_result = ''
1050        for result in result_list:
1051            flags = os.O_RDONLY
1052            modes = stat.S_IWUSR | stat.S_IRUSR
1053            with os.fdopen(os.open(result, flags, modes), "r",
1054                           encoding="utf-8") as file_content:
1055                result = file_content.read()
1056                if not result.endswith('\n'):
1057                    result = '%s\n' % result
1058            total_result = '{}{}'.format(total_result, result)
1059        parsers = get_plugin(Plugin.PARSER, ParserType.build_only_test)
1060        parser_instances = []
1061        for parser in parsers:
1062            parser_instance = parser.__class__()
1063            parser_instance.suite_name = self.file_name
1064            parser_instance.listeners = request.listeners
1065            parser_instances.append(parser_instance)
1066        handler = ShellHandler(parser_instances)
1067        generate_report(handler, total_result)
1068
1069    @classmethod
1070    def _get_result_list(cls, file_path):
1071        result_list = list()
1072        for root_path, dirs_path, file_names in os.walk(file_path):
1073            for file_name in file_names:
1074                if file_name == "logfile":
1075                    result_list.append(os.path.join(root_path, file_name))
1076        return result_list
1077
1078    def _get_log_file(self):
1079        json_config = JsonParser(self.config_file)
1080        log_path = get_config_value('log_path', json_config.get_driver(),
1081                                    False)
1082        log_path = str(log_path.replace("/", "", 1)) if log_path.startswith(
1083            "/") else str(log_path)
1084        LOG.debug("The log path is:%s" % log_path)
1085        file_path = get_file_absolute_path(log_path,
1086                                           paths=[self.testcases_path])
1087        LOG.debug("The file path is:%s" % file_path)
1088        return file_path
1089
1090    def __result__(self):
1091        return self.result if os.path.exists(self.result) else ""
1092
1093
1094@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite)
1095class JSUnitTestLiteDriver(IDriver):
1096    """
1097    JSUnitTestDriver is a Test that runs a native test package on given device.
1098    """
1099
1100    def __init__(self):
1101        self.result = ""
1102        self.error_message = ""
1103        self.kits = []
1104        self.config = None
1105
1106    def __check_environment__(self, device_options):
1107        pass
1108
1109    def __check_config__(self, config):
1110        pass
1111
1112    def _get_driver_config(self, json_config):
1113        bundle_name = get_config_value('bundle-name',
1114                                       json_config.get_driver(), False)
1115        if not bundle_name:
1116            raise ParamError("Can't find bundle-name in config file.",
1117                             error_no="00108")
1118        else:
1119            self.config.bundle_name = bundle_name
1120
1121        ability = get_config_value('ability',
1122                                   json_config.get_driver(), False)
1123        if not ability:
1124            self.config.ability = "default"
1125        else:
1126            self.config.ability = ability
1127
1128    def __execute__(self, request):
1129        try:
1130            LOG.debug("Start execute xdevice extension JSUnit Test")
1131
1132            self.config = request.config
1133            self.config.device = request.config.environment.devices[0]
1134
1135            config_file = request.root.source.config_file
1136            suite_file = request.root.source.source_file
1137
1138            if not suite_file:
1139                raise ParamError(
1140                    "test source '%s' not exists" %
1141                    request.root.source.source_string, error_no="00101")
1142
1143            if not os.path.exists(config_file):
1144                LOG.error("Error: Test cases don't exist %s." % config_file,
1145                          error_no="00101")
1146                raise ParamError(
1147                    "Error: Test cases don't exist %s." % config_file,
1148                    error_no="00101")
1149
1150            self.file_name = os.path.basename(
1151                request.root.source.source_file.strip()).split(".")[0]
1152
1153            self.result = "%s.xml" % os.path.join(
1154                request.config.report_path, "result", self.file_name)
1155
1156            json_config = JsonParser(config_file)
1157            self.kits = get_kit_instances(json_config,
1158                                          self.config.resource_path,
1159                                          self.config.testcases_path)
1160
1161            self._get_driver_config(json_config)
1162            from xdevice import Scheduler
1163            for kit in self.kits:
1164                if not Scheduler.is_execute:
1165                    raise ExecuteTerminate("ExecuteTerminate",
1166                                           error_no="00300")
1167                if kit.__class__.__name__ == CKit.liteinstall:
1168                    kit.bundle_name = self.config.bundle_name
1169                kit.__setup__(self.config.device, request=request)
1170
1171            self._run_jsunit(request)
1172
1173        except Exception as exception:
1174            self.error_message = exception
1175        finally:
1176            report_name = "report" if request.root.source. \
1177                test_name.startswith("{") else get_filename_extension(
1178                request.root.source.test_name)[0]
1179
1180            self.result = check_result_report(
1181                request.config.report_path, self.result, self.error_message,
1182                report_name)
1183
1184            for kit in self.kits:
1185                kit.__teardown__(self.config.device)
1186            self.config.device.close()
1187
1188    def _run_jsunit(self, request):
1189        parser_instances = []
1190        parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite)
1191        for parser in parsers:
1192            parser_instance = parser.__class__()
1193            parser_instance.suites_name = self.file_name
1194            parser_instance.listeners = request.listeners
1195            parser_instances.append(parser_instance)
1196        handler = ShellHandler(parser_instances)
1197
1198        command = "./bin/aa start -p %s -n %s" % \
1199                  (self.config.bundle_name, self.config.ability)
1200        result, _, error = self.config.device.execute_command_with_timeout(
1201            command=command, timeout=300, receiver=handler)
1202        device_log_file = get_device_log_file(
1203            request.config.report_path,
1204            request.config.device.__get_serial__())
1205        device_log_file_open =\
1206            os.open(device_log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
1207                    FilePermission.mode_755)
1208        with os.fdopen(device_log_file_open, "a") as file_name:
1209            file_name.write("{}{}".format(
1210                "\n".join(result.split("\n")[0:-1]), "\n"))
1211            file_name.flush()
1212
1213    def __result__(self):
1214        return self.result if os.path.exists(self.result) else ""
1215