1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import os 19import sys 20import time 21import re 22 23from xdevice import DeviceTestType 24from xdevice import FilePermission 25from xdevice import HcpTestMode 26from xdevice import convert_mac 27from xdevice import ExecuteTerminate 28from xdevice import platform_logger 29from ohos.error import ErrorMessage 30from ohos.exception import LiteDeviceTimeout 31from ohos.exception import LiteDeviceConnectError 32from ohos.exception import LiteDeviceExecuteCommandError 33 34__all__ = ["generate_report", "LiteHelper"] 35 36CPP_TEST_STANDARD_SIGN = "[==========]" 37CPP_TEST_END_SIGN = "Gtest xml output finished" 38CPP_SYS_STANDARD_SIGN = "OHOS #" 39CPP_ERR_MESSAGE = "[ERR]No such file or directory: " 40CTEST_STANDARD_SIGN = "Start to run test suite" 41AT_CMD_ENDS = "OK" 42CTEST_END_SIGN = "All the test suites finished" 43CPP_TEST_STOP_SIGN = "Test Stop" 44CPP_TEST_MOUNT_SIGN = "not mount properly" 45 46_START_JSUNIT_RUN_MARKER = "[start] start run suites" 47_END_JSUNIT_RUN_MARKER = "[end] run suites end" 48INSTALL_END_MARKER = "resultMessage is install success !" 49PRODUCT_PARAMS_START = "To Obtain Product Params Start" 50PRODUCT_PARAMS_END = "To Obtain Product Params End" 51 52PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*') 53TIMEOUT = 90 54STATUS_OK_CODE = 200 55LOG = platform_logger("DmlibLite") 56 57 58def check_open_source_test(result_output): 59 if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \ 60 ("test pass" in result_output.lower() or 61 "test fail" in result_output.lower() or 62 "tests pass" in result_output.lower() or 63 "tests fail" in result_output.lower()): 64 return True 65 return False 66 67 68def check_read_test_end(result=None, input_command=None): 69 temp_result = result.replace("\n", "") 70 index = result.find(input_command) + len(input_command) 71 result_output = result[index:] 72 if input_command.startswith("./"): 73 if result_output.find(CPP_TEST_STANDARD_SIGN) != -1: 74 if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \ 75 result_output.find(CPP_TEST_END_SIGN) != -1: 76 return True 77 if check_open_source_test(result_output): 78 return True 79 if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \ 80 result_output.find(_END_JSUNIT_RUN_MARKER) >= 1: 81 return True 82 83 if result_output.find(INSTALL_END_MARKER) != -1: 84 return True 85 if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1 86 and result_output.find(CPP_TEST_STOP_SIGN) != -1): 87 LOG.info("Find test stop") 88 return True 89 if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output: 90 LOG.error("Execute file not exist, result is %s" % result_output, 91 error_no="00402") 92 raise LiteDeviceExecuteCommandError(ErrorMessage.Common.Code_0301005) 93 elif input_command.startswith("zcat"): 94 return False 95 elif input_command == "uname": 96 is_linux_uname = "Linux" in result_output and "# " in result_output 97 is_liteos_uname = "OHOS #" in result_output or "# " in result_output 98 if is_linux_uname or is_liteos_uname: 99 return True 100 elif input_command.startswith("chmod +x") and input_command.find("query.bin"): 101 if PRODUCT_PARAMS_END in result_output: 102 return True 103 else: 104 if "OHOS #" in result_output or "# " in result_output: 105 if input_command == "reboot" or input_command == "reset": 106 return False 107 if input_command.startswith("mount"): 108 if "Mount nfs finished." not in result_output: 109 return False 110 return True 111 return False 112 113 114def generate_report(receiver, result): 115 if result and receiver: 116 if result: 117 receiver.__read__(result) 118 receiver.__done__() 119 120 121def get_current_time(): 122 current_time = time.time() 123 local_time = time.localtime(current_time) 124 data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time) 125 millisecond = (current_time - int(current_time)) * 1000 126 return "%s.%03d" % (data_head, millisecond) 127 128 129class LiteHelper: 130 @staticmethod 131 def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT, 132 receiver=None): 133 """ 134 Executes command on the device. 135 136 Parameters: 137 telnet: 138 command: the command to execute 139 timeout: timeout for read result 140 receiver: parser handler 141 """ 142 from xdevice import Binder 143 time.sleep(2) 144 start_time = time.time() 145 status = True 146 error_message = "" 147 result = "" 148 if not telnet: 149 raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303018) 150 151 telnet.write(command.encode('ascii') + b"\n") 152 while time.time() - start_time < timeout: 153 data = telnet.read_until(bytes(command, encoding="utf8"), 154 timeout=1) 155 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 156 "\r", "") 157 result = "{}{}".format(result, data) 158 if command in result: 159 break 160 161 expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"), 162 bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"), 163 bytes(CPP_TEST_END_SIGN, encoding="utf8"), 164 bytes(CPP_TEST_STOP_SIGN, encoding="utf8")] 165 while time.time() - start_time < timeout: 166 if not Binder.is_executing(): 167 raise ExecuteTerminate(ErrorMessage.Common.Code_0301013) 168 _, _, data = telnet.expect(expect_result, timeout=1) 169 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 170 "\r", "") 171 result = "{}{}".format(result, data) 172 if receiver and data: 173 receiver.__read__(data) 174 if check_read_test_end(result, command): 175 break 176 else: 177 error_message = "execute %s timed out %s " % (command, timeout) 178 status = False 179 180 if receiver: 181 receiver.__done__() 182 183 if not status and command.startswith("uname"): 184 raise LiteDeviceTimeout(ErrorMessage.Device.Code_0303014.format(command)) 185 186 return result, status, error_message 187 188 @staticmethod 189 def read_local_output_test(com=None, command=None, timeout=TIMEOUT, 190 receiver=None): 191 input_command = command 192 start_time = time.time() 193 # 增加对执行测试套命令时 194 if HcpTestMode.hcp_mode and input_command.startswith("./") and input_command.find(".bin") and ( 195 "--gtest_list_tests" not in input_command): 196 device_log_file = getattr(sys, "device_log_file", "") 197 LOG.info("device_log_file:{}".format(device_log_file)) 198 result, status, error_message = LiteHelper.parser_hcp_test_suite_execute_log(com, start_time, timeout, 199 input_command, device_log_file) 200 else: 201 # 非测试套执行命令 202 result, status, error_message = LiteHelper.parser_common_execute(com, start_time, timeout, input_command, 203 receiver) 204 205 if receiver: 206 receiver.__done__() 207 208 if not status and command.startswith("uname"): 209 raise LiteDeviceTimeout(ErrorMessage.Device.Code_0303014.format(command)) 210 return result, status, error_message 211 212 @staticmethod 213 def parser_hcp_test_suite_execute_log(com, start_time, timeout, input_command, device_log_file): 214 from xdevice import Binder 215 result = "" 216 error_message = "" 217 status = True 218 # 增加对执行测试套命令时 219 device_log_file_open = os.open(device_log_file, os.O_WRONLY | 220 os.O_CREAT | os.O_APPEND, 221 FilePermission.mode_755) 222 try: 223 with os.fdopen(device_log_file_open, "a") as file_name: 224 while time.time() - start_time < timeout: 225 if not Binder.is_executing(): 226 raise ExecuteTerminate(ErrorMessage.Common.Code_0301013) 227 if com.in_waiting == 0: 228 continue 229 data = com.read(com.in_waiting).decode('gbk', errors='ignore') 230 data = PATTERN.sub('', data).replace("\r", "") 231 data = convert_mac(data) 232 file_name.write(data) 233 file_name.flush() 234 time_diff = time.time() - start_time 235 if timeout - 60 <= time_diff: 236 if LiteHelper.check_string_in_file(device_log_file, CPP_TEST_END_SIGN): 237 break 238 if CPP_TEST_END_SIGN in data or "Gtest xml" in data or "output finished" in data: 239 LOG.info("parser hcptest end:{}".format(data)) 240 break 241 else: 242 error_message = "execute {} timed out {} ".format(input_command, timeout) 243 status = False 244 except (UnicodeDecodeError, IOError, OSError) as exception: 245 if not getattr(exception, "error_no", ""): 246 setattr(exception, "error_no", "03403") 247 LOG.exception(exception, exc_info=True, error_no="03403") 248 raise exception 249 except Exception as exception: 250 exception_error = "run parser hcptest suite execute log error:{}".format(exception) 251 if not getattr(exception, "error_no", ""): 252 setattr(exception, "error_no", "03403") 253 LOG.exception(exception_error, exc_info=True, error_no="03403") 254 raise exception 255 finally: 256 LOG.debug("run parser hcptest end") 257 return " ", status, error_message 258 259 @staticmethod 260 def check_string_in_file(file_path, target_string): 261 try: 262 with open(file_path, 'r') as file: 263 for line in file: 264 if target_string in line: 265 LOG.info("{} is exist!".format(target_string)) 266 return True 267 return False 268 except FileNotFoundError: 269 LOG.info("{} doesn't exist!".format(target_string)) 270 return False 271 272 @staticmethod 273 def parser_common_execute(com, start_time, timeout, input_command, receiver): 274 from xdevice import Binder 275 result = "" 276 error_message = "" 277 status = True 278 while time.time() - start_time < timeout: 279 if not Binder.is_executing(): 280 raise ExecuteTerminate(ErrorMessage.Common.Code_0301013) 281 if com.in_waiting == 0: 282 continue 283 data = com.read(com.in_waiting).decode('gbk', errors='ignore') 284 data = PATTERN.sub('', data).replace("\r", "") 285 result = "{}{}".format(result, data) 286 if receiver and data: 287 receiver.__read__(data) 288 if check_read_test_end(result, input_command): 289 break 290 else: 291 error_message = "execute %s timed out %s " % (input_command, timeout) 292 status = False 293 return result, status, error_message 294 295 @staticmethod 296 def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT, 297 receiver=None): 298 result = "" 299 input_command = command 300 301 start = time.time() 302 from xdevice import Binder 303 while True: 304 if not Binder.is_executing(): 305 raise ExecuteTerminate(ErrorMessage.Common.Code_0301013) 306 data = com.readline().decode('gbk', errors='ignore') 307 data = PATTERN.sub('', data) 308 if isinstance(input_command, list): 309 if len(data.strip()) > 0: 310 data = "{} {}".format(get_current_time(), data) 311 if data and receiver: 312 receiver.__read__(data.replace("\r", "")) 313 result = "{}{}".format(result, data.replace("\r", "")) 314 if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+" 315 r"Ignored", data): 316 start = time.time() 317 if CTEST_END_SIGN in data: 318 break 319 if (int(time.time()) - int(start)) > timeout: 320 break 321 else: 322 result = "{}{}".format( 323 result, data.replace("\r", "").replace("\n", "").strip()) 324 if AT_CMD_ENDS in data: 325 return result, True, "" 326 if (int(time.time()) - int(start)) > timeout: 327 return result, False, "" 328 329 if receiver: 330 receiver.__done__() 331 LOG.info('Info: execute command success') 332 return result, True, "" 333 334 @staticmethod 335 def read_local_output(com=None, command=None, case_type="", 336 timeout=TIMEOUT, receiver=None): 337 if case_type == DeviceTestType.ctest_lite: 338 return LiteHelper.read_local_output_ctest(com, command, 339 timeout, receiver) 340 else: 341 return LiteHelper.read_local_output_test(com, command, 342 timeout, receiver) 343 344 @staticmethod 345 def execute_local_cmd_with_timeout(com, **kwargs): 346 """ 347 Execute command on the serial and read all the output from the serial. 348 """ 349 args = kwargs 350 command = args.get("command", None) 351 input_command = command 352 case_type = args.get("case_type", "") 353 timeout = args.get("timeout", TIMEOUT) 354 receiver = args.get("receiver", None) 355 if not com: 356 raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303010) 357 358 LOG.info("local_{} execute command shell {} with " 359 "timeout {}s".format(com.port, convert_mac(command), str(timeout))) 360 361 if isinstance(command, str): 362 command = command.encode("utf-8") 363 if command[-2:] != b"\r\n": 364 command = command.rstrip() + b'\r\n' 365 com.write(command) 366 else: 367 com.write(command) 368 return LiteHelper.read_local_output( 369 com, command=input_command, case_type=case_type, timeout=timeout, 370 receiver=receiver) 371 372 @staticmethod 373 def execute_local_command(com, command): 374 """ 375 Execute command on the serial and read all the output from the serial. 376 """ 377 if not com: 378 raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303010) 379 380 LOG.info( 381 "local_%s execute command shell %s" % (com.port, convert_mac(command))) 382 command = command.encode("utf-8") 383 if command[-2:] != b"\r\n": 384 command = command.rstrip() + b'\r\n' 385 com.write(command) 386