• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import subprocess
21import threading
22import time
23
24from xdevice import convert_mac
25from xdevice import ConfigConst
26from xdevice import FilePermission
27from xdevice import ParamError
28from xdevice import get_config_value
29from xdevice import platform_logger
30from ohos.config.config_manager import OHOSUserConfigManager
31from ohos.error import ErrorMessage
32
33LOG = platform_logger("DriverConstant")
34TIME_OUT = 300 * 1000
35
36
37def get_xml_output(config, json_config):
38    xml_output = config.testargs.get("xml-output")
39    if not xml_output:
40        if get_config_value('xml-output', json_config.get_driver(), False):
41            xml_output = get_config_value('xml-output',
42                                          json_config.get_driver(), False)
43        else:
44            xml_output = "false"
45    else:
46        xml_output = xml_output[0]
47    xml_output = str(xml_output).lower()
48    return xml_output
49
50
51def get_nfs_server(request):
52    config_manager = OHOSUserConfigManager(
53        config_file=request.get(ConfigConst.configfile, ""),
54        env=request.get(ConfigConst.test_environment, ""))
55    remote_info = config_manager.get_user_config("testcases/server",
56                                                 filter_name="NfsServer")
57    if not remote_info:
58        err_msg = ErrorMessage.Config.Code_0302018
59        LOG.error(err_msg)
60        raise ParamError(err_msg)
61    return remote_info
62
63
64def init_remote_server(lite_instance, request=None):
65    config_manager = OHOSUserConfigManager(
66        config_file=request.get(ConfigConst.configfile, ""),
67        env=request.get(ConfigConst.test_environment, ""))
68    linux_dict = config_manager.get_user_config("testcases/server")
69
70    if linux_dict:
71        setattr(lite_instance, "linux_host", linux_dict.get("ip"))
72        setattr(lite_instance, "linux_port", linux_dict.get("port"))
73        setattr(lite_instance, "linux_directory", linux_dict.get("dir"))
74
75    else:
76        raise ParamError(ErrorMessage.Config.Code_0302019)
77
78
79def create_empty_result_file(filepath, filename, error_message):
80    error_message = str(error_message)
81    error_message = error_message.replace("\"", """)
82    error_message = error_message.replace("<", "&lt;")
83    error_message = error_message.replace(">", "&gt;")
84    error_message = error_message.replace("&", "&amp;")
85    if filename.endswith(".hap"):
86        filename = filename.split(".")[0]
87    if not os.path.exists(filepath):
88        file_open = os.open(filepath, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
89                            FilePermission.mode_755)
90        with os.fdopen(file_open, "w") as file_desc:
91            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
92                                       time.localtime())
93            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
94            file_desc.write('<testsuites tests="0" failures="0" '
95                            'disabled="0" errors="0" timestamp="%s" '
96                            'time="0" name="AllTests">\n' % time_stamp)
97            file_desc.write(
98                '  <testsuite name="%s" tests="0" failures="0" '
99                'disabled="0" errors="0" time="0.0" '
100                'unavailable="1" message="%s">\n' %
101                (filename, error_message))
102            file_desc.write('  </testsuite>\n')
103            file_desc.write('</testsuites>\n')
104            file_desc.flush()
105    return
106
107
108def get_result_savepath(testsuit_path, result_rootpath):
109    findkey = "%stests%s" % (os.sep, os.sep)
110    filedir, _ = os.path.split(testsuit_path)
111    pos = filedir.find(findkey)
112    if -1 != pos:
113        subpath = filedir[pos + len(findkey):]
114        pos1 = subpath.find(os.sep)
115        if -1 != pos1:
116            subpath = subpath[pos1 + len(os.sep):]
117            result_path = os.path.join(result_rootpath, "result", subpath)
118        else:
119            result_path = os.path.join(result_rootpath, "result")
120    else:
121        result_path = os.path.join(result_rootpath, "result")
122
123    if not os.path.exists(result_path):
124        os.makedirs(result_path)
125
126    LOG.info("Result save path = %s" % result_path)
127    return result_path
128
129
130class ResultManager(object):
131    def __init__(self, testsuit_path, result_rootpath, device,
132                 device_testpath):
133        self.testsuite_path = testsuit_path
134        self.result_rootpath = result_rootpath
135        self.device = device
136        self.device_testpath = device_testpath
137        self.testsuite_name = os.path.basename(self.testsuite_path)
138        self.is_coverage = False
139
140    def set_is_coverage(self, is_coverage):
141        self.is_coverage = is_coverage
142
143    def get_test_results(self, error_message=""):
144        # Get test result files
145        filepath = self.obtain_test_result_file()
146        if not os.path.exists(filepath):
147            create_empty_result_file(filepath, self.testsuite_name,
148                                      error_message)
149
150        # Get coverage data files
151        if self.is_coverage:
152            self.obtain_coverage_data()
153
154        return filepath
155
156    def obtain_test_result_file(self):
157        result_savepath = get_result_savepath(self.testsuite_path,
158                                              self.result_rootpath)
159        if self.testsuite_path.endswith('.hap'):
160            filepath = os.path.join(result_savepath, "%s.xml" % str(
161                self.testsuite_name).split(".")[0])
162
163            remote_result_name = ""
164            if self.device.is_file_exist(os.path.join(self.device_testpath,
165                                                      "testcase_result.xml")):
166                remote_result_name = "testcase_result.xml"
167            elif self.device.is_file_exist(os.path.join(self.device_testpath,
168                                                        "report.xml")):
169                remote_result_name = "report.xml"
170
171            if remote_result_name:
172                self.device.pull_file(
173                    os.path.join(self.device_testpath, remote_result_name),
174                    filepath)
175            else:
176                LOG.error("%s no report file", self.device_testpath)
177
178        else:
179            filepath = os.path.join(result_savepath, "%s.xml" %
180                                    self.testsuite_name)
181            remote_result_file = os.path.join(self.device_testpath,
182                                              "%s.xml" % self.testsuite_name)
183
184            if self.device.is_file_exist(remote_result_file):
185                self.device.pull_file(remote_result_file, result_savepath)
186            else:
187                LOG.error("%s not exists", remote_result_file)
188        return filepath
189
190    def is_exist_target_in_device(self, path, target):
191        command = "ls -l %s | grep %s" % (path, target)
192
193        check_result = False
194        stdout_info = self.device.execute_shell_command(command)
195        if stdout_info != "" and stdout_info.find(target) != -1:
196            check_result = True
197        return check_result
198
199    def obtain_coverage_data(self):
200        java_cov_path = os.path.abspath(
201            os.path.join(self.result_rootpath, "..", "coverage/data/exec"))
202        dst_target_name = "%s.exec" % self.testsuite_name
203        src_target_name = "jacoco.exec"
204        if self.is_exist_target_in_device(self.device_testpath,
205                                          src_target_name):
206            if not os.path.exists(java_cov_path):
207                os.makedirs(java_cov_path)
208            self.device.pull_file(
209                os.path.join(self.device_testpath, src_target_name),
210                os.path.join(java_cov_path, dst_target_name))
211
212        cxx_cov_path = os.path.abspath(
213            os.path.join(self.result_rootpath, "..", "coverage/data/cxx",
214                         self.testsuite_name))
215        target_name = "obj"
216        if self.is_exist_target_in_device(self.device_testpath, target_name):
217            if not os.path.exists(cxx_cov_path):
218                os.makedirs(cxx_cov_path)
219            src_file = os.path.join(self.device_testpath, target_name)
220            self.device.pull_file(src_file, cxx_cov_path)
221
222
223class JavaThread(threading.Thread):
224    def __init__(self, name, log_path, command, execute_jar_path, handler=None, wait_time=0):
225        super().__init__()
226        self.name = name
227        self.log_path = log_path
228        self.command = command
229        self.handler = handler
230        self.wait_time = wait_time
231        self.jar_path = execute_jar_path
232        self.jar_process = None
233        self.finished = threading.Event()
234
235    def cancel(self):
236        """Stop the timer if it hasn't finished yet"""
237        self.finished.set()
238
239    def run_java_test(self):
240        LOG.info('start to run java testcase')
241        LOG.debug('run java command: {}'.format(convert_mac(self.command)))
242        self.jar_process = subprocess.Popen(self.command, stdout=subprocess.PIPE,
243                                            cwd=self.jar_path)
244        jar_log_open = os.open(self.log_path, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
245                               FilePermission.mode_755)
246        with os.fdopen(jar_log_open, "a+") as file_data:
247            for line in iter(self.jar_process.stdout.readline, b''):
248                line = line.decode('GBK')
249                file_data.write(line)
250                if self.handler:
251                    self.handler.__read__(line.replace('\r', ''))
252        self.jar_process.stdout.close()
253        self.jar_process.wait()
254
255    def run(self):
256        self.finished.wait(self.wait_time)
257        if not self.finished.is_set():
258            self.run_java_test()
259        self.finished.set()
260
261    def stop(self):
262        LOG.info("Stop java process")
263        self.kill_proc_and_subproc(self.jar_process)
264
265    def kill_proc_and_subproc(self, proc):
266        """ Stops a process started and subprocess
267        started by process by kill_proc_and_subproc.
268        """
269        try:
270            cmd = "taskkill /T /F /PID {}".format(str(proc.pid))
271            subprocess.Popen(cmd.split(),
272                             stdout=subprocess.PIPE,
273                             stderr=subprocess.PIPE,
274                             shell=False)
275
276        except Exception as _err:
277            LOG.debug("kill subprocess exception error:{}".format(_err))
278