• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import time
21import re
22
23from _core.constants import DeviceTestType
24from _core.exception import ExecuteTerminate
25from _core.exception import LiteDeviceTimeout
26from _core.exception import LiteDeviceConnectError
27from _core.exception import LiteDeviceExecuteCommandError
28from _core.logger import platform_logger
29from _core.utils import convert_port
30
31__all__ = ["generate_report", "LiteHelper"]
32
33CPP_TEST_STANDARD_SIGN = "[==========]"
34CPP_TEST_END_SIGN = "Gtest xml output finished"
35CPP_SYS_STANDARD_SIGN = "OHOS #"
36CPP_ERR_MESSAGE = "[ERR]No such file or directory: "
37CTEST_STANDARD_SIGN = "Start to run test suite"
38AT_CMD_ENDS = "OK"
39CTEST_END_SIGN = "All the test suites finished"
40CPP_TEST_STOP_SIGN = "Test Stop"
41CPP_TEST_MOUNT_SIGN = "not mount properly"
42
43_START_JSUNIT_RUN_MARKER = "[start] start run suites"
44_END_JSUNIT_RUN_MARKER = "[end] run suites end"
45INSTALL_END_MARKER = "resultMessage is install success !"
46
47PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*')
48TIMEOUT = 90
49LOG = platform_logger("DmlibLite")
50
51
52def check_open_source_test(result_output):
53    if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \
54            ("test pass" in result_output.lower() or
55             "test fail" in result_output.lower() or
56             "tests pass" in result_output.lower() or
57             "tests fail" in result_output.lower()):
58        return True
59    return False
60
61
62def check_read_test_end(result=None, input_command=None):
63    temp_result = result.replace("\n", "")
64    # if input_command not in temp_result:
65    #     return False
66    index = result.find(input_command) + len(input_command)
67    result_output = result[index:]
68    if input_command.startswith("./"):
69        if result_output.find(CPP_TEST_STANDARD_SIGN) != -1:
70            if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \
71                    result_output.find(CPP_TEST_END_SIGN) != -1:
72                return True
73        if check_open_source_test(result_output):
74            return True
75        if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \
76                result_output.find(_END_JSUNIT_RUN_MARKER) >= 1:
77            return True
78
79        if result_output.find(INSTALL_END_MARKER) != -1:
80            return True
81        if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1
82                and result_output.find(CPP_TEST_STOP_SIGN) != -1):
83            LOG.info("find test stop")
84            return True
85        if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output:
86            LOG.error("execute file not exist, result is %s" % result_output,
87                      error_no="00402")
88            raise LiteDeviceExecuteCommandError("execute file not exist",
89                                                error_no="00402")
90    elif input_command.startswith("zcat"):
91        return False
92    else:
93        if "OHOS #" in result_output or "# " in result_output:
94            if input_command == "reboot" or input_command == "reset":
95                return False
96            if input_command.startswith("mount"):
97                if "Mount nfs finished." not in result_output:
98                    return False
99            return True
100    return False
101
102
103def generate_report(receiver, result):
104    if result and receiver:
105        if result:
106            receiver.__read__(result)
107            receiver.__done__()
108
109
110def get_current_time():
111    current_time = time.time()
112    local_time = time.localtime(current_time)
113    data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
114    millisecond = (current_time - int(current_time)) * 1000
115    return "%s.%03d" % (data_head, millisecond)
116
117
118class LiteHelper:
119    @staticmethod
120    def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT,
121                                        receiver=None):
122        """
123        Executes command on the device.
124
125        Parameters:
126            telnet:
127            command: the command to execute
128            timeout: timeout for read result
129            receiver: parser handler
130        """
131        from xdevice import Scheduler
132        time.sleep(2)
133        start_time = time.time()
134        status = True
135        error_message = ""
136        result = ""
137        if not telnet:
138            raise LiteDeviceConnectError("remote device is not connected.",
139                                         error_no="00402")
140
141        telnet.write(command.encode('ascii') + b"\n")
142        while time.time() - start_time < timeout:
143            data = telnet.read_until(bytes(command, encoding="utf8"),
144                                     timeout=1)
145            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
146                "\r", "")
147            result = "{}{}".format(result, data)
148            if command in result:
149                break
150
151        expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"),
152                         bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"),
153                         bytes(CPP_TEST_END_SIGN, encoding="utf8"),
154                         bytes(CPP_TEST_STOP_SIGN, encoding="utf8")]
155        while time.time() - start_time < timeout:
156            if not Scheduler.is_execute:
157                raise ExecuteTerminate("Execute terminate", error_no="00300")
158            _, _, data = telnet.expect(expect_result, timeout=1)
159            data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace(
160                "\r", "")
161            result = "{}{}".format(result, data)
162            if receiver and data:
163                receiver.__read__(data)
164            if check_read_test_end(result, command):
165                break
166        else:
167            error_message = "execute %s timed out %s " % (command, timeout)
168            status = False
169
170        if receiver:
171            receiver.__done__()
172
173        if not status and command.startswith("uname"):
174            raise LiteDeviceTimeout("Execute command time out:%s" % command)
175
176        return result, status, error_message
177
178    @staticmethod
179    def read_local_output_test(com=None, command=None, timeout=TIMEOUT,
180                               receiver=None):
181        input_command = command
182        linux_end_command = ""
183        if "--gtest_output=" in command:
184            linux_end_command = input_command.split(":")[1].split(
185                "reports")[0].rstrip("/") + " #"
186        error_message = ""
187        start_time = time.time()
188        result = ""
189        status = True
190        from xdevice import Scheduler
191
192        # while time.time() - start_time < timeout:
193        #     data = com.readline().decode('gbk', errors='ignore')
194        #     data = PATTERN.sub('', data).replace("\r", "")
195        #     result = "{}{}".format(result, data)
196        #     if command in result or linux_end_command in result:
197        #         break
198
199        while time.time() - start_time < timeout:
200            if not Scheduler.is_execute:
201                raise ExecuteTerminate("Execute terminate", error_no="00300")
202            data = com.readline().decode('gbk', errors='ignore')
203            data = PATTERN.sub('', data).replace("\r", "")
204            result = "{}{}".format(result, data)
205            if receiver and data:
206                receiver.__read__(data)
207            if check_read_test_end(result, input_command):
208                break
209        else:
210            error_message = "execute %s timed out %s " % (command, timeout)
211            status = False
212
213        if receiver:
214            receiver.__done__()
215
216        if not status and command.startswith("uname"):
217            raise LiteDeviceTimeout("Execute command time out:%s" % command)
218
219        return result, status, error_message
220
221    @staticmethod
222    def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT,
223                                receiver=None):
224        result = ""
225        input_command = command
226
227        start = time.time()
228        from xdevice import Scheduler
229        while True:
230            if not Scheduler.is_execute:
231                raise ExecuteTerminate("Execute terminate", error_no="00300")
232            data = com.readline().decode('gbk', errors='ignore')
233            data = PATTERN.sub('', data)
234            if isinstance(input_command, list):
235                if len(data.strip()) > 0:
236                    data = "{} {}".format(get_current_time(), data)
237                    if data and receiver:
238                        receiver.__read__(data.replace("\r", ""))
239                    result = "{}{}".format(result, data.replace("\r", ""))
240                    if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+"
241                                 r"Ignored", data):
242                        start = time.time()
243                    if CTEST_END_SIGN in data:
244                        break
245                if (int(time.time()) - int(start)) > timeout:
246                    break
247            else:
248                result = "{}{}".format(
249                    result, data.replace("\r", "").replace("\n", "").strip())
250                if AT_CMD_ENDS in data:
251                    return result, True, ""
252                if (int(time.time()) - int(start)) > timeout:
253                    return result, False, ""
254
255        if receiver:
256            receiver.__done__()
257        LOG.info('Info: execute command success')
258        return result, True, ""
259
260    @staticmethod
261    def read_local_output(com=None, command=None, case_type="",
262                          timeout=TIMEOUT, receiver=None):
263        if case_type == DeviceTestType.ctest_lite:
264            return LiteHelper.read_local_output_ctest(com, command,
265                                                      timeout, receiver)
266        else:
267            return LiteHelper.read_local_output_test(com, command,
268                                                     timeout, receiver)
269
270    @staticmethod
271    def execute_local_cmd_with_timeout(com, **kwargs):
272        """
273        Execute command on the serial and read all the output from the serial.
274        """
275        args = kwargs
276        command = args.get("command", None)
277        input_command = command
278        case_type = args.get("case_type", "")
279        timeout = args.get("timeout", TIMEOUT)
280        receiver = args.get("receiver", None)
281        if not com:
282            raise LiteDeviceConnectError("local device is not connected.",
283                                         error_no="00402")
284
285        LOG.info("local_%s execute command shell %s with timeout %ss" %
286                 (convert_port(com.port), command, str(timeout)))
287
288        if isinstance(command, str):
289            command = command.encode("utf-8")
290            if command[-2:] != b"\r\n":
291                command = command.rstrip() + b'\r\n'
292                com.write(command)
293        else:
294            com.write(command)
295        return LiteHelper.read_local_output(
296            com, command=input_command, case_type=case_type, timeout=timeout,
297            receiver=receiver)
298
299    @staticmethod
300    def execute_local_command(com, command):
301        """
302        Execute command on the serial and read all the output from the serial.
303        """
304        if not com:
305            raise LiteDeviceConnectError("local device is not connected.",
306                                         error_no="00402")
307
308        LOG.info(
309            "local_%s execute command shell %s" % (convert_port(com.port),
310                                                   command))
311        command = command.encode("utf-8")
312        if command[-2:] != b"\r\n":
313            command = command.rstrip() + b'\r\n'
314            com.write(command)
315
316    @staticmethod
317    def execute_agent_cmd_with_timeout(self, command,
318                                       **kwargs):
319        import requests
320
321        base_url = self.base_url
322        headers = self.headers
323        local_source_dir = self.local_source_dir
324        target_save_path = self.target_save_path
325        args = kwargs
326        command_type = args.get("type", None)
327        sync = args.get("sync", 1)
328        response = None
329        try:
330            if "cmd" == command_type:
331                url = base_url + "/_processes/"
332                data = {
333                    'cmd': command,
334                    'sync': sync,
335                    'stdoutRedirect': 'file',
336                    'stderrRedirect': 'file'
337                }
338                response = requests.post(url=url, headers=headers,
339                                         data=json.dumps(data))
340                if response.status_code == 200:
341                    LOG.info("connect OK")
342                else:
343                    LOG.info("connect failed")
344                response.close()
345
346            elif "put" == command_type:
347                url = base_url + target_save_path + command
348                with open(local_source_dir + command, "rb") as data:
349                    response = requests.put(url=url, headers=headers,
350                                            data=data)
351                if response.status_code == 200:
352                    LOG.info("{} upload file to {} "
353                             "success".format(local_source_dir,
354                                              target_save_path))
355                else:
356                    LOG.info(
357                        "{} upload file to {} fail".format(local_source_dir,
358                                                           target_save_path))
359                response.close()
360            elif "get" == command_type:
361                url = base_url + target_save_path + command
362                response = requests.get(url=url, headers=headers, stream=True)
363                if response.status_code == 200:
364                    with open(local_source_dir + command + "bak", "wb") \
365                            as file_name:
366                        for file_data in response.iter_content(chunk_size=512):
367                            file_name.write(file_data)
368                    LOG.info("from {} download  file to {} success".format(
369                        target_save_path + command,
370                        local_source_dir))
371                else:
372                    LOG.info("{} download  file to {} fail".format(
373                        target_save_path + command,
374                        command,
375                        local_source_dir))
376            elif "delete" == command_type:
377                url = base_url + target_save_path + command
378                LOG.info(url)
379                response = requests.delete(url)
380                if response.status_code == 200:
381                    LOG.info("delete {}  file  success".format(
382                        target_save_path + command))
383                else:
384                    LOG.info("delete {}  file  fail".format(
385                        target_save_path + command))
386            else:
387                LOG.info("type error")
388        except Exception as error_message:
389            LOG.info("error_message:{}".format(error_message))
390        finally:
391            if response:
392                response.close()
393
394        return "", True, ""
395