• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import json
19import time
20import re
21
22from xdevice import DeviceTestType
23from xdevice import ExecuteTerminate
24from xdevice import platform_logger
25from ohos.exception import LiteDeviceTimeout
26from ohos.exception import LiteDeviceConnectError
27from ohos.exception import LiteDeviceExecuteCommandError
28
29
30__all__ = ["generate_report", "LiteHelper"]
31
32CPP_TEST_STANDARD_SIGN = "[==========]"
33CPP_TEST_END_SIGN = "Gtest xml output finished"
34CPP_SYS_STANDARD_SIGN = "OHOS #"
35CPP_ERR_MESSAGE = "[ERR]No such file or directory: "
36CTEST_STANDARD_SIGN = "Start to run test suite"
37AT_CMD_ENDS = "OK"
38CTEST_END_SIGN = "All the test suites finished"
39CPP_TEST_STOP_SIGN = "Test Stop"
40CPP_TEST_MOUNT_SIGN = "not mount properly"
41
42_START_JSUNIT_RUN_MARKER = "[start] start run suites"
43_END_JSUNIT_RUN_MARKER = "[end] run suites end"
44INSTALL_END_MARKER = "resultMessage is install success !"
45
46PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*')
47TIMEOUT = 90
48STATUS_OK_CODE = 200
49LOG = platform_logger("DmlibLite")
50
51
52def check_open_source_test(result_output):
53    if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \
54            ("test pass" in result_output.lower() or
55             "test fail" in result_output.lower() or
56             "tests pass" in result_output.lower() or
57             "tests fail" in result_output.lower()):
58        return True
59    return False
60
61
62def check_read_test_end(result=None, input_command=None):
63    temp_result = result.replace("\n", "")
64    index = result.find(input_command) + len(input_command)
65    result_output = result[index:]
66    if input_command.startswith("./"):
67        if result_output.find(CPP_TEST_STANDARD_SIGN) != -1:
68            if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \
69                    result_output.find(CPP_TEST_END_SIGN) != -1:
70                return True
71        if check_open_source_test(result_output):
72            return True
73        if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \
74                result_output.find(_END_JSUNIT_RUN_MARKER) >= 1:
75            return True
76
77        if result_output.find(INSTALL_END_MARKER) != -1:
78            return True
79        if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1
80                and result_output.find(CPP_TEST_STOP_SIGN) != -1):
81            LOG.info("Find test stop")
82            return True
83        if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output:
84            LOG.error("Execute file not exist, result is %s" % result_output,
85                      error_no="00402")
86            raise LiteDeviceExecuteCommandError("execute file not exist",
87                                                error_no="00402")
88    elif input_command.startswith("zcat"):
89        return False
90    else:
91        if "OHOS #" in result_output or "# " in result_output:
92            if input_command == "reboot" or input_command == "reset":
93                return False
94            if input_command.startswith("mount"):
95                if "Mount nfs finished." not in result_output:
96                    return False
97            return True
98    return False
99
100
101def generate_report(receiver, result):
102    if result and receiver:
103        if result:
104            receiver.__read__(result)
105            receiver.__done__()
106
107
108def get_current_time():
109    current_time = time.time()
110    local_time = time.localtime(current_time)
111    data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
112    millisecond = (current_time - int(current_time)) * 1000
113    return "%s.%03d" % (data_head, millisecond)
114
115
116class LiteHelper:
117    @staticmethod
118    def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT,
119                                        receiver=None):
120        """
121        Executes command on the device.
122
123        Parameters:
124            telnet:
125            command: the command to execute
126            timeout: timeout for read result
127            receiver: parser handler
128        """
129        from xdevice import Scheduler
130        time.sleep(2)
131        start_time = time.time()
132        status = True
133        error_message = ""
134        result = ""
135        if not telnet:
136            raise LiteDeviceConnectError("remote device is not connected.",
137                                         error_no="00402")
138
139        telnet.write(command.encode('ascii') + b"\n")
140        while time.time() - start_time < timeout:
141            data = telnet.read_until(bytes(command, encoding="utf8"),
142                                     timeout=1)
143            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
144                "\r", "")
145            result = "{}{}".format(result, data)
146            if command in result:
147                break
148
149        expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"),
150                         bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"),
151                         bytes(CPP_TEST_END_SIGN, encoding="utf8"),
152                         bytes(CPP_TEST_STOP_SIGN, encoding="utf8")]
153        while time.time() - start_time < timeout:
154            if not Scheduler.is_execute:
155                raise ExecuteTerminate("Execute terminate", error_no="00300")
156            _, _, data = telnet.expect(expect_result, timeout=1)
157            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
158                "\r", "")
159            result = "{}{}".format(result, data)
160            if receiver and data:
161                receiver.__read__(data)
162            if check_read_test_end(result, command):
163                break
164        else:
165            error_message = "execute %s timed out %s " % (command, timeout)
166            status = False
167
168        if receiver:
169            receiver.__done__()
170
171        if not status and command.startswith("uname"):
172            raise LiteDeviceTimeout("Execute command time out:%s" % command)
173
174        return result, status, error_message
175
176    @staticmethod
177    def read_local_output_test(com=None, command=None, timeout=TIMEOUT,
178                               receiver=None):
179        input_command = command
180        linux_end_command = ""
181        if "--gtest_output=" in command:
182            linux_end_command = input_command.split(":")[1].split(
183                "reports")[0].rstrip("/") + " #"
184        error_message = ""
185        start_time = time.time()
186        result = ""
187        status = True
188        from xdevice import Scheduler
189        while time.time() - start_time < timeout:
190            if not Scheduler.is_execute:
191                raise ExecuteTerminate("Execute terminate", error_no="00300")
192            data = com.readline().decode('gbk', errors='ignore')
193            data = PATTERN.sub('', data).replace("\r", "")
194            result = "{}{}".format(result, data)
195            if receiver and data:
196                receiver.__read__(data)
197            if check_read_test_end(result, input_command):
198                break
199        else:
200            error_message = "execute %s timed out %s " % (command, timeout)
201            status = False
202
203        if receiver:
204            receiver.__done__()
205
206        if not status and command.startswith("uname"):
207            raise LiteDeviceTimeout("Execute command time out:%s" % command)
208
209        return result, status, error_message
210
211    @staticmethod
212    def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT,
213                                receiver=None):
214        result = ""
215        input_command = command
216
217        start = time.time()
218        from xdevice import Scheduler
219        while True:
220            if not Scheduler.is_execute:
221                raise ExecuteTerminate("Execute terminate", error_no="00300")
222            data = com.readline().decode('gbk', errors='ignore')
223            data = PATTERN.sub('', data)
224            if isinstance(input_command, list):
225                if len(data.strip()) > 0:
226                    data = "{} {}".format(get_current_time(), data)
227                    if data and receiver:
228                        receiver.__read__(data.replace("\r", ""))
229                    result = "{}{}".format(result, data.replace("\r", ""))
230                    if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+"
231                                 r"Ignored", data):
232                        start = time.time()
233                    if CTEST_END_SIGN in data:
234                        break
235                if (int(time.time()) - int(start)) > timeout:
236                    break
237            else:
238                result = "{}{}".format(
239                    result, data.replace("\r", "").replace("\n", "").strip())
240                if AT_CMD_ENDS in data:
241                    return result, True, ""
242                if (int(time.time()) - int(start)) > timeout:
243                    return result, False, ""
244
245        if receiver:
246            receiver.__done__()
247        LOG.info('Info: execute command success')
248        return result, True, ""
249
250    @staticmethod
251    def read_local_output(com=None, command=None, case_type="",
252                          timeout=TIMEOUT, receiver=None):
253        if case_type == DeviceTestType.ctest_lite:
254            return LiteHelper.read_local_output_ctest(com, command,
255                                                      timeout, receiver)
256        else:
257            return LiteHelper.read_local_output_test(com, command,
258                                                     timeout, receiver)
259
260    @staticmethod
261    def execute_local_cmd_with_timeout(com, **kwargs):
262        """
263        Execute command on the serial and read all the output from the serial.
264        """
265        args = kwargs
266        command = args.get("command", None)
267        input_command = command
268        case_type = args.get("case_type", "")
269        timeout = args.get("timeout", TIMEOUT)
270        receiver = args.get("receiver", None)
271        if not com:
272            raise LiteDeviceConnectError("local device is not connected.",
273                                         error_no="00402")
274
275        LOG.info("local_%s execute command shell %s with timeout %ss" %
276                 (com.port, command, str(timeout)))
277
278        if isinstance(command, str):
279            command = command.encode("utf-8")
280            if command[-2:] != b"\r\n":
281                command = command.rstrip() + b'\r\n'
282                com.write(command)
283        else:
284            com.write(command)
285        return LiteHelper.read_local_output(
286            com, command=input_command, case_type=case_type, timeout=timeout,
287            receiver=receiver)
288
289    @staticmethod
290    def execute_local_command(com, command):
291        """
292        Execute command on the serial and read all the output from the serial.
293        """
294        if not com:
295            raise LiteDeviceConnectError("local device is not connected.",
296                                         error_no="00402")
297
298        LOG.info(
299            "local_%s execute command shell %s" % (com.port, command))
300        command = command.encode("utf-8")
301        if command[-2:] != b"\r\n":
302            command = command.rstrip() + b'\r\n'
303            com.write(command)
304