1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import json 19import time 20import re 21 22from xdevice import DeviceTestType 23from xdevice import ExecuteTerminate 24from xdevice import platform_logger 25from ohos.exception import LiteDeviceTimeout 26from ohos.exception import LiteDeviceConnectError 27from ohos.exception import LiteDeviceExecuteCommandError 28 29 30__all__ = ["generate_report", "LiteHelper"] 31 32CPP_TEST_STANDARD_SIGN = "[==========]" 33CPP_TEST_END_SIGN = "Gtest xml output finished" 34CPP_SYS_STANDARD_SIGN = "OHOS #" 35CPP_ERR_MESSAGE = "[ERR]No such file or directory: " 36CTEST_STANDARD_SIGN = "Start to run test suite" 37AT_CMD_ENDS = "OK" 38CTEST_END_SIGN = "All the test suites finished" 39CPP_TEST_STOP_SIGN = "Test Stop" 40CPP_TEST_MOUNT_SIGN = "not mount properly" 41 42_START_JSUNIT_RUN_MARKER = "[start] start run suites" 43_END_JSUNIT_RUN_MARKER = "[end] run suites end" 44INSTALL_END_MARKER = "resultMessage is install success !" 45 46PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*') 47TIMEOUT = 90 48STATUS_OK_CODE = 200 49LOG = platform_logger("DmlibLite") 50 51 52def check_open_source_test(result_output): 53 if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \ 54 ("test pass" in result_output.lower() or 55 "test fail" in result_output.lower() or 56 "tests pass" in result_output.lower() or 57 "tests fail" in result_output.lower()): 58 return True 59 return False 60 61 62def check_read_test_end(result=None, input_command=None): 63 temp_result = result.replace("\n", "") 64 index = result.find(input_command) + len(input_command) 65 result_output = result[index:] 66 if input_command.startswith("./"): 67 if result_output.find(CPP_TEST_STANDARD_SIGN) != -1: 68 if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \ 69 result_output.find(CPP_TEST_END_SIGN) != -1: 70 return True 71 if check_open_source_test(result_output): 72 return True 73 if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \ 74 result_output.find(_END_JSUNIT_RUN_MARKER) >= 1: 75 return True 76 77 if result_output.find(INSTALL_END_MARKER) != -1: 78 return True 79 if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1 80 and result_output.find(CPP_TEST_STOP_SIGN) != -1): 81 LOG.info("Find test stop") 82 return True 83 if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output: 84 LOG.error("Execute file not exist, result is %s" % result_output, 85 error_no="00402") 86 raise LiteDeviceExecuteCommandError("execute file not exist", 87 error_no="00402") 88 elif input_command.startswith("zcat"): 89 return False 90 else: 91 if "OHOS #" in result_output or "# " in result_output: 92 if input_command == "reboot" or input_command == "reset": 93 return False 94 if input_command.startswith("mount"): 95 if "Mount nfs finished." not in result_output: 96 return False 97 return True 98 return False 99 100 101def generate_report(receiver, result): 102 if result and receiver: 103 if result: 104 receiver.__read__(result) 105 receiver.__done__() 106 107 108def get_current_time(): 109 current_time = time.time() 110 local_time = time.localtime(current_time) 111 data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time) 112 millisecond = (current_time - int(current_time)) * 1000 113 return "%s.%03d" % (data_head, millisecond) 114 115 116class LiteHelper: 117 @staticmethod 118 def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT, 119 receiver=None): 120 """ 121 Executes command on the device. 122 123 Parameters: 124 telnet: 125 command: the command to execute 126 timeout: timeout for read result 127 receiver: parser handler 128 """ 129 from xdevice import Scheduler 130 time.sleep(2) 131 start_time = time.time() 132 status = True 133 error_message = "" 134 result = "" 135 if not telnet: 136 raise LiteDeviceConnectError("remote device is not connected.", 137 error_no="00402") 138 139 telnet.write(command.encode('ascii') + b"\n") 140 while time.time() - start_time < timeout: 141 data = telnet.read_until(bytes(command, encoding="utf8"), 142 timeout=1) 143 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 144 "\r", "") 145 result = "{}{}".format(result, data) 146 if command in result: 147 break 148 149 expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"), 150 bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"), 151 bytes(CPP_TEST_END_SIGN, encoding="utf8"), 152 bytes(CPP_TEST_STOP_SIGN, encoding="utf8")] 153 while time.time() - start_time < timeout: 154 if not Scheduler.is_execute: 155 raise ExecuteTerminate("Execute terminate", error_no="00300") 156 _, _, data = telnet.expect(expect_result, timeout=1) 157 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 158 "\r", "") 159 result = "{}{}".format(result, data) 160 if receiver and data: 161 receiver.__read__(data) 162 if check_read_test_end(result, command): 163 break 164 else: 165 error_message = "execute %s timed out %s " % (command, timeout) 166 status = False 167 168 if receiver: 169 receiver.__done__() 170 171 if not status and command.startswith("uname"): 172 raise LiteDeviceTimeout("Execute command time out:%s" % command) 173 174 return result, status, error_message 175 176 @staticmethod 177 def read_local_output_test(com=None, command=None, timeout=TIMEOUT, 178 receiver=None): 179 input_command = command 180 linux_end_command = "" 181 if "--gtest_output=" in command: 182 linux_end_command = input_command.split(":")[1].split( 183 "reports")[0].rstrip("/") + " #" 184 error_message = "" 185 start_time = time.time() 186 result = "" 187 status = True 188 from xdevice import Scheduler 189 while time.time() - start_time < timeout: 190 if not Scheduler.is_execute: 191 raise ExecuteTerminate("Execute terminate", error_no="00300") 192 data = com.readline().decode('gbk', errors='ignore') 193 data = PATTERN.sub('', data).replace("\r", "") 194 result = "{}{}".format(result, data) 195 if receiver and data: 196 receiver.__read__(data) 197 if check_read_test_end(result, input_command): 198 break 199 else: 200 error_message = "execute %s timed out %s " % (command, timeout) 201 status = False 202 203 if receiver: 204 receiver.__done__() 205 206 if not status and command.startswith("uname"): 207 raise LiteDeviceTimeout("Execute command time out:%s" % command) 208 209 return result, status, error_message 210 211 @staticmethod 212 def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT, 213 receiver=None): 214 result = "" 215 input_command = command 216 217 start = time.time() 218 from xdevice import Scheduler 219 while True: 220 if not Scheduler.is_execute: 221 raise ExecuteTerminate("Execute terminate", error_no="00300") 222 data = com.readline().decode('gbk', errors='ignore') 223 data = PATTERN.sub('', data) 224 if isinstance(input_command, list): 225 if len(data.strip()) > 0: 226 data = "{} {}".format(get_current_time(), data) 227 if data and receiver: 228 receiver.__read__(data.replace("\r", "")) 229 result = "{}{}".format(result, data.replace("\r", "")) 230 if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+" 231 r"Ignored", data): 232 start = time.time() 233 if CTEST_END_SIGN in data: 234 break 235 if (int(time.time()) - int(start)) > timeout: 236 break 237 else: 238 result = "{}{}".format( 239 result, data.replace("\r", "").replace("\n", "").strip()) 240 if AT_CMD_ENDS in data: 241 return result, True, "" 242 if (int(time.time()) - int(start)) > timeout: 243 return result, False, "" 244 245 if receiver: 246 receiver.__done__() 247 LOG.info('Info: execute command success') 248 return result, True, "" 249 250 @staticmethod 251 def read_local_output(com=None, command=None, case_type="", 252 timeout=TIMEOUT, receiver=None): 253 if case_type == DeviceTestType.ctest_lite: 254 return LiteHelper.read_local_output_ctest(com, command, 255 timeout, receiver) 256 else: 257 return LiteHelper.read_local_output_test(com, command, 258 timeout, receiver) 259 260 @staticmethod 261 def execute_local_cmd_with_timeout(com, **kwargs): 262 """ 263 Execute command on the serial and read all the output from the serial. 264 """ 265 args = kwargs 266 command = args.get("command", None) 267 input_command = command 268 case_type = args.get("case_type", "") 269 timeout = args.get("timeout", TIMEOUT) 270 receiver = args.get("receiver", None) 271 if not com: 272 raise LiteDeviceConnectError("local device is not connected.", 273 error_no="00402") 274 275 LOG.info("local_%s execute command shell %s with timeout %ss" % 276 (com.port, command, str(timeout))) 277 278 if isinstance(command, str): 279 command = command.encode("utf-8") 280 if command[-2:] != b"\r\n": 281 command = command.rstrip() + b'\r\n' 282 com.write(command) 283 else: 284 com.write(command) 285 return LiteHelper.read_local_output( 286 com, command=input_command, case_type=case_type, timeout=timeout, 287 receiver=receiver) 288 289 @staticmethod 290 def execute_local_command(com, command): 291 """ 292 Execute command on the serial and read all the output from the serial. 293 """ 294 if not com: 295 raise LiteDeviceConnectError("local device is not connected.", 296 error_no="00402") 297 298 LOG.info( 299 "local_%s execute command shell %s" % (com.port, command)) 300 command = command.encode("utf-8") 301 if command[-2:] != b"\r\n": 302 command = command.rstrip() + b'\r\n' 303 com.write(command) 304