• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import os
19import sys
20import time
21import re
22
23from xdevice import DeviceTestType
24from xdevice import FilePermission
25from xdevice import HcpTestMode
26from xdevice import convert_mac
27from xdevice import ExecuteTerminate
28from xdevice import platform_logger
29from ohos.error import ErrorMessage
30from ohos.exception import LiteDeviceTimeout
31from ohos.exception import LiteDeviceConnectError
32from ohos.exception import LiteDeviceExecuteCommandError
33
34__all__ = ["generate_report", "LiteHelper"]
35
36CPP_TEST_STANDARD_SIGN = "[==========]"
37CPP_TEST_END_SIGN = "Gtest xml output finished"
38CPP_SYS_STANDARD_SIGN = "OHOS #"
39CPP_ERR_MESSAGE = "[ERR]No such file or directory: "
40CTEST_STANDARD_SIGN = "Start to run test suite"
41AT_CMD_ENDS = "OK"
42CTEST_END_SIGN = "All the test suites finished"
43CPP_TEST_STOP_SIGN = "Test Stop"
44CPP_TEST_MOUNT_SIGN = "not mount properly"
45
46_START_JSUNIT_RUN_MARKER = "[start] start run suites"
47_END_JSUNIT_RUN_MARKER = "[end] run suites end"
48INSTALL_END_MARKER = "resultMessage is install success !"
49PRODUCT_PARAMS_START = "To Obtain Product Params Start"
50PRODUCT_PARAMS_END = "To Obtain Product Params End"
51
52PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*')
53TIMEOUT = 90
54STATUS_OK_CODE = 200
55LOG = platform_logger("DmlibLite")
56
57
58def check_open_source_test(result_output):
59    if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \
60            ("test pass" in result_output.lower() or
61             "test fail" in result_output.lower() or
62             "tests pass" in result_output.lower() or
63             "tests fail" in result_output.lower()):
64        return True
65    return False
66
67
68def check_read_test_end(result=None, input_command=None):
69    temp_result = result.replace("\n", "")
70    index = result.find(input_command) + len(input_command)
71    result_output = result[index:]
72    if input_command.startswith("./"):
73        if result_output.find(CPP_TEST_STANDARD_SIGN) != -1:
74            if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \
75                    result_output.find(CPP_TEST_END_SIGN) != -1:
76                return True
77        if check_open_source_test(result_output):
78            return True
79        if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \
80                result_output.find(_END_JSUNIT_RUN_MARKER) >= 1:
81            return True
82
83        if result_output.find(INSTALL_END_MARKER) != -1:
84            return True
85        if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1
86                and result_output.find(CPP_TEST_STOP_SIGN) != -1):
87            LOG.info("Find test stop")
88            return True
89        if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output:
90            LOG.error("Execute file not exist, result is %s" % result_output,
91                      error_no="00402")
92            raise LiteDeviceExecuteCommandError(ErrorMessage.Common.Code_0301005)
93    elif input_command.startswith("zcat"):
94        return False
95    elif input_command == "uname":
96        is_linux_uname = "Linux" in result_output and "# " in result_output
97        is_liteos_uname = "OHOS #" in result_output or "# " in result_output
98        if is_linux_uname or is_liteos_uname:
99            return True
100    elif input_command.startswith("chmod +x") and input_command.find("query.bin"):
101        if PRODUCT_PARAMS_END in result_output:
102            return True
103    else:
104        if "OHOS #" in result_output or "# " in result_output:
105            if input_command == "reboot" or input_command == "reset":
106                return False
107            if input_command.startswith("mount"):
108                if "Mount nfs finished." not in result_output:
109                    return False
110            return True
111    return False
112
113
114def generate_report(receiver, result):
115    if result and receiver:
116        if result:
117            receiver.__read__(result)
118            receiver.__done__()
119
120
121def get_current_time():
122    current_time = time.time()
123    local_time = time.localtime(current_time)
124    data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
125    millisecond = (current_time - int(current_time)) * 1000
126    return "%s.%03d" % (data_head, millisecond)
127
128
129class LiteHelper:
130    @staticmethod
131    def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT,
132                                        receiver=None):
133        """
134        Executes command on the device.
135
136        Parameters:
137            telnet:
138            command: the command to execute
139            timeout: timeout for read result
140            receiver: parser handler
141        """
142        from xdevice import Binder
143        time.sleep(2)
144        start_time = time.time()
145        status = True
146        error_message = ""
147        result = ""
148        if not telnet:
149            raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303018)
150
151        telnet.write(command.encode('ascii') + b"\n")
152        while time.time() - start_time < timeout:
153            data = telnet.read_until(bytes(command, encoding="utf8"),
154                                     timeout=1)
155            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
156                "\r", "")
157            result = "{}{}".format(result, data)
158            if command in result:
159                break
160
161        expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"),
162                         bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"),
163                         bytes(CPP_TEST_END_SIGN, encoding="utf8"),
164                         bytes(CPP_TEST_STOP_SIGN, encoding="utf8")]
165        while time.time() - start_time < timeout:
166            if not Binder.is_executing():
167                raise ExecuteTerminate(ErrorMessage.Common.Code_0301013)
168            _, _, data = telnet.expect(expect_result, timeout=1)
169            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
170                "\r", "")
171            result = "{}{}".format(result, data)
172            if receiver and data:
173                receiver.__read__(data)
174            if check_read_test_end(result, command):
175                break
176        else:
177            error_message = "execute %s timed out %s " % (command, timeout)
178            status = False
179
180        if receiver:
181            receiver.__done__()
182
183        if not status and command.startswith("uname"):
184            raise LiteDeviceTimeout(ErrorMessage.Device.Code_0303014.format(command))
185
186        return result, status, error_message
187
188    @staticmethod
189    def read_local_output_test(com=None, command=None, timeout=TIMEOUT,
190                               receiver=None):
191        input_command = command
192        start_time = time.time()
193        # 增加对执行测试套命令时
194        if HcpTestMode.hcp_mode and input_command.startswith("./") and input_command.find(".bin") and (
195                "--gtest_list_tests" not in input_command):
196            device_log_file = getattr(sys, "device_log_file", "")
197            LOG.info("device_log_file:{}".format(device_log_file))
198            result, status, error_message = LiteHelper.parser_hcp_test_suite_execute_log(com, start_time, timeout,
199                                                                                         input_command, device_log_file)
200        else:
201            # 非测试套执行命令
202            result, status, error_message = LiteHelper.parser_common_execute(com, start_time, timeout, input_command,
203                                                                             receiver)
204
205        if receiver:
206            receiver.__done__()
207
208        if not status and command.startswith("uname"):
209            raise LiteDeviceTimeout(ErrorMessage.Device.Code_0303014.format(command))
210        return result, status, error_message
211
212    @staticmethod
213    def parser_hcp_test_suite_execute_log(com, start_time, timeout, input_command, device_log_file):
214        from xdevice import Binder
215        result = ""
216        error_message = ""
217        status = True
218        # 增加对执行测试套命令时
219        device_log_file_open = os.open(device_log_file, os.O_WRONLY |
220                                       os.O_CREAT | os.O_APPEND,
221                                       FilePermission.mode_755)
222        try:
223            with os.fdopen(device_log_file_open, "a") as file_name:
224                while time.time() - start_time < timeout:
225                    if not Binder.is_executing():
226                        raise ExecuteTerminate(ErrorMessage.Common.Code_0301013)
227                    if com.in_waiting == 0:
228                        continue
229                    data = com.read(com.in_waiting).decode('gbk', errors='ignore')
230                    data = PATTERN.sub('', data).replace("\r", "")
231                    data = convert_mac(data)
232                    file_name.write(data)
233                    file_name.flush()
234                    time_diff = time.time() - start_time
235                    if timeout - 60 <= time_diff:
236                        if LiteHelper.check_string_in_file(device_log_file, CPP_TEST_END_SIGN):
237                            break
238                    if CPP_TEST_END_SIGN in data or "Gtest xml" in data or "output finished" in data:
239                        LOG.info("parser hcptest end:{}".format(data))
240                        break
241                else:
242                    error_message = "execute {} timed out {} ".format(input_command, timeout)
243                    status = False
244        except (UnicodeDecodeError, IOError, OSError) as exception:
245            if not getattr(exception, "error_no", ""):
246                setattr(exception, "error_no", "03403")
247            LOG.exception(exception, exc_info=True, error_no="03403")
248            raise exception
249        except Exception as exception:
250            exception_error = "run parser hcptest suite execute log error:{}".format(exception)
251            if not getattr(exception, "error_no", ""):
252                setattr(exception, "error_no", "03403")
253            LOG.exception(exception_error, exc_info=True, error_no="03403")
254            raise exception
255        finally:
256            LOG.debug("run parser hcptest end")
257        return " ", status, error_message
258
259    @staticmethod
260    def check_string_in_file(file_path, target_string):
261        try:
262            with open(file_path, 'r') as file:
263                for line in file:
264                    if target_string in line:
265                        LOG.info("{} is exist!".format(target_string))
266                        return True
267            return False
268        except FileNotFoundError:
269            LOG.info("{} doesn't exist!".format(target_string))
270            return False
271
272    @staticmethod
273    def parser_common_execute(com, start_time, timeout, input_command, receiver):
274        from xdevice import Binder
275        result = ""
276        error_message = ""
277        status = True
278        while time.time() - start_time < timeout:
279            if not Binder.is_executing():
280                raise ExecuteTerminate(ErrorMessage.Common.Code_0301013)
281            if com.in_waiting == 0:
282                continue
283            data = com.read(com.in_waiting).decode('gbk', errors='ignore')
284            data = PATTERN.sub('', data).replace("\r", "")
285            result = "{}{}".format(result, data)
286            if receiver and data:
287                receiver.__read__(data)
288            if check_read_test_end(result, input_command):
289                break
290        else:
291            error_message = "execute %s timed out %s " % (input_command, timeout)
292            status = False
293        return result, status, error_message
294
295    @staticmethod
296    def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT,
297                                receiver=None):
298        result = ""
299        input_command = command
300
301        start = time.time()
302        from xdevice import Binder
303        while True:
304            if not Binder.is_executing():
305                raise ExecuteTerminate(ErrorMessage.Common.Code_0301013)
306            data = com.readline().decode('gbk', errors='ignore')
307            data = PATTERN.sub('', data)
308            if isinstance(input_command, list):
309                if len(data.strip()) > 0:
310                    data = "{} {}".format(get_current_time(), data)
311                    if data and receiver:
312                        receiver.__read__(data.replace("\r", ""))
313                    result = "{}{}".format(result, data.replace("\r", ""))
314                    if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+"
315                                 r"Ignored", data):
316                        start = time.time()
317                    if CTEST_END_SIGN in data:
318                        break
319                if (int(time.time()) - int(start)) > timeout:
320                    break
321            else:
322                result = "{}{}".format(
323                    result, data.replace("\r", "").replace("\n", "").strip())
324                if AT_CMD_ENDS in data:
325                    return result, True, ""
326                if (int(time.time()) - int(start)) > timeout:
327                    return result, False, ""
328
329        if receiver:
330            receiver.__done__()
331        LOG.info('Info: execute command success')
332        return result, True, ""
333
334    @staticmethod
335    def read_local_output(com=None, command=None, case_type="",
336                          timeout=TIMEOUT, receiver=None):
337        if case_type == DeviceTestType.ctest_lite:
338            return LiteHelper.read_local_output_ctest(com, command,
339                                                      timeout, receiver)
340        else:
341            return LiteHelper.read_local_output_test(com, command,
342                                                     timeout, receiver)
343
344    @staticmethod
345    def execute_local_cmd_with_timeout(com, **kwargs):
346        """
347        Execute command on the serial and read all the output from the serial.
348        """
349        args = kwargs
350        command = args.get("command", None)
351        input_command = command
352        case_type = args.get("case_type", "")
353        timeout = args.get("timeout", TIMEOUT)
354        receiver = args.get("receiver", None)
355        if not com:
356            raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303010)
357
358        LOG.info("local_{} execute command shell {} with "
359                 "timeout {}s".format(com.port, convert_mac(command), str(timeout)))
360
361        if isinstance(command, str):
362            command = command.encode("utf-8")
363            if command[-2:] != b"\r\n":
364                command = command.rstrip() + b'\r\n'
365                com.write(command)
366        else:
367            com.write(command)
368        return LiteHelper.read_local_output(
369            com, command=input_command, case_type=case_type, timeout=timeout,
370            receiver=receiver)
371
372    @staticmethod
373    def execute_local_command(com, command):
374        """
375        Execute command on the serial and read all the output from the serial.
376        """
377        if not com:
378            raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303010)
379
380        LOG.info(
381            "local_%s execute command shell %s" % (com.port, convert_mac(command)))
382        command = command.encode("utf-8")
383        if command[-2:] != b"\r\n":
384            command = command.rstrip() + b'\r\n'
385            com.write(command)
386