• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import json
19import time
20import re
21
22from xdevice import DeviceTestType
23from xdevice import ExecuteTerminate
24from xdevice import platform_logger
25from ohos.exception import LiteDeviceTimeout
26from ohos.exception import LiteDeviceConnectError
27from ohos.exception import LiteDeviceExecuteCommandError
28
29__all__ = ["generate_report", "LiteHelper"]
30
31CPP_TEST_STANDARD_SIGN = "[==========]"
32CPP_TEST_END_SIGN = "Gtest xml output finished"
33CPP_SYS_STANDARD_SIGN = "OHOS #"
34CPP_ERR_MESSAGE = "[ERR]No such file or directory: "
35CTEST_STANDARD_SIGN = "Start to run test suite"
36AT_CMD_ENDS = "OK"
37CTEST_END_SIGN = "All the test suites finished"
38CPP_TEST_STOP_SIGN = "Test Stop"
39CPP_TEST_MOUNT_SIGN = "not mount properly"
40
41_START_JSUNIT_RUN_MARKER = "[start] start run suites"
42_END_JSUNIT_RUN_MARKER = "[end] run suites end"
43INSTALL_END_MARKER = "resultMessage is install success !"
44PRODUCT_PARAMS_START = "To Obtain Product Params Start"
45PRODUCT_PARAMS_END = "To Obtain Product Params End"
46
47PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*')
48TIMEOUT = 90
49STATUS_OK_CODE = 200
50LOG = platform_logger("DmlibLite")
51
52
53def check_open_source_test(result_output):
54    if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \
55            ("test pass" in result_output.lower() or
56             "test fail" in result_output.lower() or
57             "tests pass" in result_output.lower() or
58             "tests fail" in result_output.lower()):
59        return True
60    return False
61
62
63def check_read_test_end(result=None, input_command=None):
64    temp_result = result.replace("\n", "")
65    index = result.find(input_command) + len(input_command)
66    result_output = result[index:]
67    if input_command.startswith("./"):
68        if result_output.find(CPP_TEST_STANDARD_SIGN) != -1:
69            if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \
70                    result_output.find(CPP_TEST_END_SIGN) != -1:
71                return True
72        if check_open_source_test(result_output):
73            return True
74        if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \
75                result_output.find(_END_JSUNIT_RUN_MARKER) >= 1:
76            return True
77
78        if result_output.find(INSTALL_END_MARKER) != -1:
79            return True
80        if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1
81                and result_output.find(CPP_TEST_STOP_SIGN) != -1):
82            LOG.info("Find test stop")
83            return True
84        if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output:
85            LOG.error("Execute file not exist, result is %s" % result_output,
86                      error_no="00402")
87            raise LiteDeviceExecuteCommandError("execute file not exist",
88                                                error_no="00402")
89    elif input_command.startswith("zcat"):
90        return False
91    elif input_command == "uname":
92        if ("Linux" in result_output and "# " in result_output) \
93              or ("OHOS #" in result_output or "#" in result_output):
94            return True
95    elif input_command.startswith("chmod +x") and input_command.find("query.bin"):
96        if PRODUCT_PARAMS_END in result_output:
97            return True
98    else:
99        if "OHOS #" in result_output or "# " in result_output:
100            if input_command == "reboot" or input_command == "reset":
101                return False
102            if input_command.startswith("mount"):
103                if "Mount nfs finished." not in result_output:
104                    return False
105            return True
106    return False
107
108
109def generate_report(receiver, result):
110    if result and receiver:
111        if result:
112            receiver.__read__(result)
113            receiver.__done__()
114
115
116def get_current_time():
117    current_time = time.time()
118    local_time = time.localtime(current_time)
119    data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
120    millisecond = (current_time - int(current_time)) * 1000
121    return "%s.%03d" % (data_head, millisecond)
122
123
124class LiteHelper:
125    @staticmethod
126    def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT,
127                                        receiver=None):
128        """
129        Executes command on the device.
130
131        Parameters:
132            telnet:
133            command: the command to execute
134            timeout: timeout for read result
135            receiver: parser handler
136        """
137        from xdevice import Scheduler
138        time.sleep(2)
139        start_time = time.time()
140        status = True
141        error_message = ""
142        result = ""
143        if not telnet:
144            raise LiteDeviceConnectError("remote device is not connected.",
145                                         error_no="00402")
146
147        telnet.write(command.encode('ascii') + b"\n")
148        while time.time() - start_time < timeout:
149            data = telnet.read_until(bytes(command, encoding="utf8"),
150                                     timeout=1)
151            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
152                "\r", "")
153            result = "{}{}".format(result, data)
154            if command in result:
155                break
156
157        expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"),
158                         bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"),
159                         bytes(CPP_TEST_END_SIGN, encoding="utf8"),
160                         bytes(CPP_TEST_STOP_SIGN, encoding="utf8")]
161        while time.time() - start_time < timeout:
162            if not Scheduler.is_execute:
163                raise ExecuteTerminate("Execute terminate", error_no="00300")
164            _, _, data = telnet.expect(expect_result, timeout=1)
165            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
166                "\r", "")
167            result = "{}{}".format(result, data)
168            if receiver and data:
169                receiver.__read__(data)
170            if check_read_test_end(result, command):
171                break
172        else:
173            error_message = "execute %s timed out %s " % (command, timeout)
174            status = False
175
176        if receiver:
177            receiver.__done__()
178
179        if not status and command.startswith("uname"):
180            raise LiteDeviceTimeout("Execute command time out:%s" % command)
181
182        return result, status, error_message
183
184    @staticmethod
185    def read_local_output_test(com=None, command=None, timeout=TIMEOUT,
186                               receiver=None):
187        input_command = command
188        linux_end_command = ""
189        if "--gtest_output=" in command:
190            linux_end_command = input_command.split(":")[1].split(
191                "reports")[0].rstrip("/") + " #"
192        error_message = ""
193        start_time = time.time()
194        result = ""
195        status = True
196        from xdevice import Scheduler
197        while time.time() - start_time < timeout:
198            if not Scheduler.is_execute:
199                raise ExecuteTerminate("Execute terminate", error_no="00300")
200            if com.in_waiting == 0:
201                continue
202            data = com.read(com.in_waiting).decode('gbk', errors='ignore')
203            data = PATTERN.sub('', data).replace("\r", "")
204            result = "{}{}".format(result, data)
205            if receiver and data:
206                receiver.__read__(data)
207            if check_read_test_end(result, input_command):
208                break
209        else:
210            error_message = "execute %s timed out %s " % (command, timeout)
211            status = False
212
213        if receiver:
214            receiver.__done__()
215
216        if not status and command.startswith("uname"):
217            raise LiteDeviceTimeout("Execute command time out:%s" % command)
218
219        return result, status, error_message
220
221    @staticmethod
222    def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT,
223                                receiver=None):
224        result = ""
225        input_command = command
226
227        start = time.time()
228        from xdevice import Scheduler
229        while True:
230            if not Scheduler.is_execute:
231                raise ExecuteTerminate("Execute terminate", error_no="00300")
232            data = com.readline().decode('gbk', errors='ignore')
233            data = PATTERN.sub('', data)
234            if isinstance(input_command, list):
235                if len(data.strip()) > 0:
236                    data = "{} {}".format(get_current_time(), data)
237                    if data and receiver:
238                        receiver.__read__(data.replace("\r", ""))
239                    result = "{}{}".format(result, data.replace("\r", ""))
240                    if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+"
241                                 r"Ignored", data):
242                        start = time.time()
243                    if CTEST_END_SIGN in data:
244                        break
245                if (int(time.time()) - int(start)) > timeout:
246                    break
247            else:
248                result = "{}{}".format(
249                    result, data.replace("\r", "").replace("\n", "").strip())
250                if AT_CMD_ENDS in data:
251                    return result, True, ""
252                if (int(time.time()) - int(start)) > timeout:
253                    return result, False, ""
254
255        if receiver:
256            receiver.__done__()
257        LOG.info('Info: execute command success')
258        return result, True, ""
259
260    @staticmethod
261    def read_local_output(com=None, command=None, case_type="",
262                          timeout=TIMEOUT, receiver=None):
263        if case_type == DeviceTestType.ctest_lite:
264            return LiteHelper.read_local_output_ctest(com, command,
265                                                      timeout, receiver)
266        else:
267            return LiteHelper.read_local_output_test(com, command,
268                                                     timeout, receiver)
269
270    @staticmethod
271    def execute_local_cmd_with_timeout(com, **kwargs):
272        """
273        Execute command on the serial and read all the output from the serial.
274        """
275        args = kwargs
276        command = args.get("command", None)
277        input_command = command
278        case_type = args.get("case_type", "")
279        timeout = args.get("timeout", TIMEOUT)
280        receiver = args.get("receiver", None)
281        if not com:
282            raise LiteDeviceConnectError("local device is not connected.",
283                                         error_no="00402")
284
285        LOG.info("local_%s execute command shell %s with timeout %ss" %
286                 (com.port, command, str(timeout)))
287
288        if isinstance(command, str):
289            command = command.encode("utf-8")
290            if command[-2:] != b"\r\n":
291                command = command.rstrip() + b'\r\n'
292                com.write(command)
293        else:
294            com.write(command)
295        return LiteHelper.read_local_output(
296            com, command=input_command, case_type=case_type, timeout=timeout,
297            receiver=receiver)
298
299    @staticmethod
300    def execute_local_command(com, command):
301        """
302        Execute command on the serial and read all the output from the serial.
303        """
304        if not com:
305            raise LiteDeviceConnectError("local device is not connected.",
306                                         error_no="00402")
307
308        LOG.info(
309            "local_%s execute command shell %s" % (com.port, command))
310        command = command.encode("utf-8")
311        if command[-2:] != b"\r\n":
312            command = command.rstrip() + b'\r\n'
313            com.write(command)
314