• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import json
19import time
20import re
21
22from xdevice import DeviceTestType
23from xdevice import ExecuteTerminate
24from xdevice import platform_logger
25from ohos.exception import LiteDeviceTimeout
26from ohos.exception import LiteDeviceConnectError
27from ohos.exception import LiteDeviceExecuteCommandError
28
29
30__all__ = ["generate_report", "LiteHelper"]
31
32CPP_TEST_STANDARD_SIGN = "[==========]"
33CPP_TEST_END_SIGN = "Gtest xml output finished"
34CPP_SYS_STANDARD_SIGN = "OHOS #"
35CPP_ERR_MESSAGE = "[ERR]No such file or directory: "
36CTEST_STANDARD_SIGN = "Start to run test suite"
37AT_CMD_ENDS = "OK"
38CTEST_END_SIGN = "All the test suites finished"
39CPP_TEST_STOP_SIGN = "Test Stop"
40CPP_TEST_MOUNT_SIGN = "not mount properly"
41
42_START_JSUNIT_RUN_MARKER = "[start] start run suites"
43_END_JSUNIT_RUN_MARKER = "[end] run suites end"
44INSTALL_END_MARKER = "resultMessage is install success !"
45
46PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*')
47TIMEOUT = 90
48STATUS_OK_CODE = 200
49LOG = platform_logger("DmlibLite")
50
51
52def check_open_source_test(result_output):
53    if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \
54            ("test pass" in result_output.lower() or
55             "test fail" in result_output.lower() or
56             "tests pass" in result_output.lower() or
57             "tests fail" in result_output.lower()):
58        return True
59    return False
60
61
62def check_read_test_end(result=None, input_command=None):
63    temp_result = result.replace("\n", "")
64    index = result.find(input_command) + len(input_command)
65    result_output = result[index:]
66    if input_command.startswith("./"):
67        if result_output.find(CPP_TEST_STANDARD_SIGN) != -1:
68            if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \
69                    result_output.find(CPP_TEST_END_SIGN) != -1:
70                return True
71        if check_open_source_test(result_output):
72            return True
73        if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \
74                result_output.find(_END_JSUNIT_RUN_MARKER) >= 1:
75            return True
76
77        if result_output.find(INSTALL_END_MARKER) != -1:
78            return True
79        if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1
80                and result_output.find(CPP_TEST_STOP_SIGN) != -1):
81            LOG.info("Find test stop")
82            return True
83        if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output:
84            LOG.error("Execute file not exist, result is %s" % result_output,
85                      error_no="00402")
86            raise LiteDeviceExecuteCommandError("execute file not exist",
87                                                error_no="00402")
88    elif input_command.startswith("zcat"):
89        return False
90    else:
91        if "OHOS #" in result_output or "# " in result_output:
92            if input_command == "reboot" or input_command == "reset":
93                return False
94            if input_command.startswith("mount"):
95                if "Mount nfs finished." not in result_output:
96                    return False
97            return True
98    return False
99
100
101def generate_report(receiver, result):
102    if result and receiver:
103        if result:
104            receiver.__read__(result)
105            receiver.__done__()
106
107
108def get_current_time():
109    current_time = time.time()
110    local_time = time.localtime(current_time)
111    data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
112    millisecond = (current_time - int(current_time)) * 1000
113    return "%s.%03d" % (data_head, millisecond)
114
115
116class LiteHelper:
117    @staticmethod
118    def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT,
119                                        receiver=None):
120        """
121        Executes command on the device.
122
123        Parameters:
124            telnet:
125            command: the command to execute
126            timeout: timeout for read result
127            receiver: parser handler
128        """
129        from xdevice import Scheduler
130        time.sleep(2)
131        start_time = time.time()
132        status = True
133        error_message = ""
134        result = ""
135        if not telnet:
136            raise LiteDeviceConnectError("remote device is not connected.",
137                                         error_no="00402")
138
139        telnet.write(command.encode('ascii') + b"\n")
140        while time.time() - start_time < timeout:
141            data = telnet.read_until(bytes(command, encoding="utf8"),
142                                     timeout=1)
143            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
144                "\r", "")
145            result = "{}{}".format(result, data)
146            if command in result:
147                break
148
149        expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"),
150                         bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"),
151                         bytes(CPP_TEST_END_SIGN, encoding="utf8"),
152                         bytes(CPP_TEST_STOP_SIGN, encoding="utf8")]
153        while time.time() - start_time < timeout:
154            if not Scheduler.is_execute:
155                raise ExecuteTerminate("Execute terminate", error_no="00300")
156            _, _, data = telnet.expect(expect_result, timeout=1)
157            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
158                "\r", "")
159            result = "{}{}".format(result, data)
160            if receiver and data:
161                receiver.__read__(data)
162            if check_read_test_end(result, command):
163                break
164        else:
165            error_message = "execute %s timed out %s " % (command, timeout)
166            status = False
167
168        if receiver:
169            receiver.__done__()
170
171        if not status and command.startswith("uname"):
172            raise LiteDeviceTimeout("Execute command time out:%s" % command)
173
174        return result, status, error_message
175
176    @staticmethod
177    def read_local_output_test(com=None, command=None, timeout=TIMEOUT,
178                               receiver=None):
179        input_command = command
180        linux_end_command = ""
181        if "--gtest_output=" in command:
182            linux_end_command = input_command.split(":")[1].split(
183                "reports")[0].rstrip("/") + " #"
184        error_message = ""
185        start_time = time.time()
186        result = ""
187        status = True
188        from xdevice import Scheduler
189        while time.time() - start_time < timeout:
190            if not Scheduler.is_execute:
191                raise ExecuteTerminate("Execute terminate", error_no="00300")
192            if com.in_waiting == 0:
193                continue
194            data = com.read(com.in_waiting).decode('gbk', errors='ignore')
195            data = PATTERN.sub('', data).replace("\r", "")
196            result = "{}{}".format(result, data)
197            if receiver and data:
198                receiver.__read__(data)
199            if check_read_test_end(result, input_command):
200                break
201        else:
202            error_message = "execute %s timed out %s " % (command, timeout)
203            status = False
204
205        if receiver:
206            receiver.__done__()
207
208        if not status and command.startswith("uname"):
209            raise LiteDeviceTimeout("Execute command time out:%s" % command)
210
211        return result, status, error_message
212
213    @staticmethod
214    def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT,
215                                receiver=None):
216        result = ""
217        input_command = command
218
219        start = time.time()
220        from xdevice import Scheduler
221        while True:
222            if not Scheduler.is_execute:
223                raise ExecuteTerminate("Execute terminate", error_no="00300")
224            data = com.readline().decode('gbk', errors='ignore')
225            data = PATTERN.sub('', data)
226            if isinstance(input_command, list):
227                if len(data.strip()) > 0:
228                    data = "{} {}".format(get_current_time(), data)
229                    if data and receiver:
230                        receiver.__read__(data.replace("\r", ""))
231                    result = "{}{}".format(result, data.replace("\r", ""))
232                    if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+"
233                                 r"Ignored", data):
234                        start = time.time()
235                    if CTEST_END_SIGN in data:
236                        break
237                if (int(time.time()) - int(start)) > timeout:
238                    break
239            else:
240                result = "{}{}".format(
241                    result, data.replace("\r", "").replace("\n", "").strip())
242                if AT_CMD_ENDS in data:
243                    return result, True, ""
244                if (int(time.time()) - int(start)) > timeout:
245                    return result, False, ""
246
247        if receiver:
248            receiver.__done__()
249        LOG.info('Info: execute command success')
250        return result, True, ""
251
252    @staticmethod
253    def read_local_output(com=None, command=None, case_type="",
254                          timeout=TIMEOUT, receiver=None):
255        if case_type == DeviceTestType.ctest_lite:
256            return LiteHelper.read_local_output_ctest(com, command,
257                                                      timeout, receiver)
258        else:
259            return LiteHelper.read_local_output_test(com, command,
260                                                     timeout, receiver)
261
262    @staticmethod
263    def execute_local_cmd_with_timeout(com, **kwargs):
264        """
265        Execute command on the serial and read all the output from the serial.
266        """
267        args = kwargs
268        command = args.get("command", None)
269        input_command = command
270        case_type = args.get("case_type", "")
271        timeout = args.get("timeout", TIMEOUT)
272        receiver = args.get("receiver", None)
273        if not com:
274            raise LiteDeviceConnectError("local device is not connected.",
275                                         error_no="00402")
276
277        LOG.info("local_%s execute command shell %s with timeout %ss" %
278                 (com.port, command, str(timeout)))
279
280        if isinstance(command, str):
281            command = command.encode("utf-8")
282            if command[-2:] != b"\r\n":
283                command = command.rstrip() + b'\r\n'
284                com.write(command)
285        else:
286            com.write(command)
287        return LiteHelper.read_local_output(
288            com, command=input_command, case_type=case_type, timeout=timeout,
289            receiver=receiver)
290
291    @staticmethod
292    def execute_local_command(com, command):
293        """
294        Execute command on the serial and read all the output from the serial.
295        """
296        if not com:
297            raise LiteDeviceConnectError("local device is not connected.",
298                                         error_no="00402")
299
300        LOG.info(
301            "local_%s execute command shell %s" % (com.port, command))
302        command = command.encode("utf-8")
303        if command[-2:] != b"\r\n":
304            command = command.rstrip() + b'\r\n'
305            com.write(command)
306