1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import json 19import time 20import re 21 22from xdevice import DeviceTestType 23from xdevice import ExecuteTerminate 24from xdevice import platform_logger 25from ohos.exception import LiteDeviceTimeout 26from ohos.exception import LiteDeviceConnectError 27from ohos.exception import LiteDeviceExecuteCommandError 28 29__all__ = ["generate_report", "LiteHelper"] 30 31CPP_TEST_STANDARD_SIGN = "[==========]" 32CPP_TEST_END_SIGN = "Gtest xml output finished" 33CPP_SYS_STANDARD_SIGN = "OHOS #" 34CPP_ERR_MESSAGE = "[ERR]No such file or directory: " 35CTEST_STANDARD_SIGN = "Start to run test suite" 36AT_CMD_ENDS = "OK" 37CTEST_END_SIGN = "All the test suites finished" 38CPP_TEST_STOP_SIGN = "Test Stop" 39CPP_TEST_MOUNT_SIGN = "not mount properly" 40 41_START_JSUNIT_RUN_MARKER = "[start] start run suites" 42_END_JSUNIT_RUN_MARKER = "[end] run suites end" 43INSTALL_END_MARKER = "resultMessage is install success !" 44PRODUCT_PARAMS_START = "To Obtain Product Params Start" 45PRODUCT_PARAMS_END = "To Obtain Product Params End" 46 47PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*') 48TIMEOUT = 90 49STATUS_OK_CODE = 200 50LOG = platform_logger("DmlibLite") 51 52 53def check_open_source_test(result_output): 54 if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \ 55 ("test pass" in result_output.lower() or 56 "test fail" in result_output.lower() or 57 "tests pass" in result_output.lower() or 58 "tests fail" in result_output.lower()): 59 return True 60 return False 61 62 63def check_read_test_end(result=None, input_command=None): 64 temp_result = result.replace("\n", "") 65 index = result.find(input_command) + len(input_command) 66 result_output = result[index:] 67 if input_command.startswith("./"): 68 if result_output.find(CPP_TEST_STANDARD_SIGN) != -1: 69 if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \ 70 result_output.find(CPP_TEST_END_SIGN) != -1: 71 return True 72 if check_open_source_test(result_output): 73 return True 74 if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \ 75 result_output.find(_END_JSUNIT_RUN_MARKER) >= 1: 76 return True 77 78 if result_output.find(INSTALL_END_MARKER) != -1: 79 return True 80 if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1 81 and result_output.find(CPP_TEST_STOP_SIGN) != -1): 82 LOG.info("Find test stop") 83 return True 84 if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output: 85 LOG.error("Execute file not exist, result is %s" % result_output, 86 error_no="00402") 87 raise LiteDeviceExecuteCommandError("execute file not exist", 88 error_no="00402") 89 elif input_command.startswith("zcat"): 90 return False 91 elif input_command == "uname": 92 if ("Linux" in result_output and "# " in result_output) \ 93 or ("OHOS #" in result_output or "#" in result_output): 94 return True 95 elif input_command.startswith("chmod +x") and input_command.find("query.bin"): 96 if PRODUCT_PARAMS_END in result_output: 97 return True 98 else: 99 if "OHOS #" in result_output or "# " in result_output: 100 if input_command == "reboot" or input_command == "reset": 101 return False 102 if input_command.startswith("mount"): 103 if "Mount nfs finished." not in result_output: 104 return False 105 return True 106 return False 107 108 109def generate_report(receiver, result): 110 if result and receiver: 111 if result: 112 receiver.__read__(result) 113 receiver.__done__() 114 115 116def get_current_time(): 117 current_time = time.time() 118 local_time = time.localtime(current_time) 119 data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time) 120 millisecond = (current_time - int(current_time)) * 1000 121 return "%s.%03d" % (data_head, millisecond) 122 123 124class LiteHelper: 125 @staticmethod 126 def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT, 127 receiver=None): 128 """ 129 Executes command on the device. 130 131 Parameters: 132 telnet: 133 command: the command to execute 134 timeout: timeout for read result 135 receiver: parser handler 136 """ 137 from xdevice import Scheduler 138 time.sleep(2) 139 start_time = time.time() 140 status = True 141 error_message = "" 142 result = "" 143 if not telnet: 144 raise LiteDeviceConnectError("remote device is not connected.", 145 error_no="00402") 146 147 telnet.write(command.encode('ascii') + b"\n") 148 while time.time() - start_time < timeout: 149 data = telnet.read_until(bytes(command, encoding="utf8"), 150 timeout=1) 151 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 152 "\r", "") 153 result = "{}{}".format(result, data) 154 if command in result: 155 break 156 157 expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"), 158 bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"), 159 bytes(CPP_TEST_END_SIGN, encoding="utf8"), 160 bytes(CPP_TEST_STOP_SIGN, encoding="utf8")] 161 while time.time() - start_time < timeout: 162 if not Scheduler.is_execute: 163 raise ExecuteTerminate("Execute terminate", error_no="00300") 164 _, _, data = telnet.expect(expect_result, timeout=1) 165 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 166 "\r", "") 167 result = "{}{}".format(result, data) 168 if receiver and data: 169 receiver.__read__(data) 170 if check_read_test_end(result, command): 171 break 172 else: 173 error_message = "execute %s timed out %s " % (command, timeout) 174 status = False 175 176 if receiver: 177 receiver.__done__() 178 179 if not status and command.startswith("uname"): 180 raise LiteDeviceTimeout("Execute command time out:%s" % command) 181 182 return result, status, error_message 183 184 @staticmethod 185 def read_local_output_test(com=None, command=None, timeout=TIMEOUT, 186 receiver=None): 187 input_command = command 188 linux_end_command = "" 189 if "--gtest_output=" in command: 190 linux_end_command = input_command.split(":")[1].split( 191 "reports")[0].rstrip("/") + " #" 192 error_message = "" 193 start_time = time.time() 194 result = "" 195 status = True 196 from xdevice import Scheduler 197 while time.time() - start_time < timeout: 198 if not Scheduler.is_execute: 199 raise ExecuteTerminate("Execute terminate", error_no="00300") 200 if com.in_waiting == 0: 201 continue 202 data = com.read(com.in_waiting).decode('gbk', errors='ignore') 203 data = PATTERN.sub('', data).replace("\r", "") 204 result = "{}{}".format(result, data) 205 if receiver and data: 206 receiver.__read__(data) 207 if check_read_test_end(result, input_command): 208 break 209 else: 210 error_message = "execute %s timed out %s " % (command, timeout) 211 status = False 212 213 if receiver: 214 receiver.__done__() 215 216 if not status and command.startswith("uname"): 217 raise LiteDeviceTimeout("Execute command time out:%s" % command) 218 219 return result, status, error_message 220 221 @staticmethod 222 def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT, 223 receiver=None): 224 result = "" 225 input_command = command 226 227 start = time.time() 228 from xdevice import Scheduler 229 while True: 230 if not Scheduler.is_execute: 231 raise ExecuteTerminate("Execute terminate", error_no="00300") 232 data = com.readline().decode('gbk', errors='ignore') 233 data = PATTERN.sub('', data) 234 if isinstance(input_command, list): 235 if len(data.strip()) > 0: 236 data = "{} {}".format(get_current_time(), data) 237 if data and receiver: 238 receiver.__read__(data.replace("\r", "")) 239 result = "{}{}".format(result, data.replace("\r", "")) 240 if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+" 241 r"Ignored", data): 242 start = time.time() 243 if CTEST_END_SIGN in data: 244 break 245 if (int(time.time()) - int(start)) > timeout: 246 break 247 else: 248 result = "{}{}".format( 249 result, data.replace("\r", "").replace("\n", "").strip()) 250 if AT_CMD_ENDS in data: 251 return result, True, "" 252 if (int(time.time()) - int(start)) > timeout: 253 return result, False, "" 254 255 if receiver: 256 receiver.__done__() 257 LOG.info('Info: execute command success') 258 return result, True, "" 259 260 @staticmethod 261 def read_local_output(com=None, command=None, case_type="", 262 timeout=TIMEOUT, receiver=None): 263 if case_type == DeviceTestType.ctest_lite: 264 return LiteHelper.read_local_output_ctest(com, command, 265 timeout, receiver) 266 else: 267 return LiteHelper.read_local_output_test(com, command, 268 timeout, receiver) 269 270 @staticmethod 271 def execute_local_cmd_with_timeout(com, **kwargs): 272 """ 273 Execute command on the serial and read all the output from the serial. 274 """ 275 args = kwargs 276 command = args.get("command", None) 277 input_command = command 278 case_type = args.get("case_type", "") 279 timeout = args.get("timeout", TIMEOUT) 280 receiver = args.get("receiver", None) 281 if not com: 282 raise LiteDeviceConnectError("local device is not connected.", 283 error_no="00402") 284 285 LOG.info("local_%s execute command shell %s with timeout %ss" % 286 (com.port, command, str(timeout))) 287 288 if isinstance(command, str): 289 command = command.encode("utf-8") 290 if command[-2:] != b"\r\n": 291 command = command.rstrip() + b'\r\n' 292 com.write(command) 293 else: 294 com.write(command) 295 return LiteHelper.read_local_output( 296 com, command=input_command, case_type=case_type, timeout=timeout, 297 receiver=receiver) 298 299 @staticmethod 300 def execute_local_command(com, command): 301 """ 302 Execute command on the serial and read all the output from the serial. 303 """ 304 if not com: 305 raise LiteDeviceConnectError("local device is not connected.", 306 error_no="00402") 307 308 LOG.info( 309 "local_%s execute command shell %s" % (com.port, command)) 310 command = command.encode("utf-8") 311 if command[-2:] != b"\r\n": 312 command = command.rstrip() + b'\r\n' 313 com.write(command) 314