1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import json 19import time 20import re 21 22from xdevice import DeviceTestType 23from xdevice import ExecuteTerminate 24from xdevice import platform_logger 25from ohos.exception import LiteDeviceTimeout 26from ohos.exception import LiteDeviceConnectError 27from ohos.exception import LiteDeviceExecuteCommandError 28 29 30__all__ = ["generate_report", "LiteHelper"] 31 32CPP_TEST_STANDARD_SIGN = "[==========]" 33CPP_TEST_END_SIGN = "Gtest xml output finished" 34CPP_SYS_STANDARD_SIGN = "OHOS #" 35CPP_ERR_MESSAGE = "[ERR]No such file or directory: " 36CTEST_STANDARD_SIGN = "Start to run test suite" 37AT_CMD_ENDS = "OK" 38CTEST_END_SIGN = "All the test suites finished" 39CPP_TEST_STOP_SIGN = "Test Stop" 40CPP_TEST_MOUNT_SIGN = "not mount properly" 41 42_START_JSUNIT_RUN_MARKER = "[start] start run suites" 43_END_JSUNIT_RUN_MARKER = "[end] run suites end" 44INSTALL_END_MARKER = "resultMessage is install success !" 45 46PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*') 47TIMEOUT = 90 48STATUS_OK_CODE = 200 49LOG = platform_logger("DmlibLite") 50 51 52def check_open_source_test(result_output): 53 if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \ 54 ("test pass" in result_output.lower() or 55 "test fail" in result_output.lower() or 56 "tests pass" in result_output.lower() or 57 "tests fail" in result_output.lower()): 58 return True 59 return False 60 61 62def check_read_test_end(result=None, input_command=None): 63 temp_result = result.replace("\n", "") 64 index = result.find(input_command) + len(input_command) 65 result_output = result[index:] 66 if input_command.startswith("./"): 67 if result_output.find(CPP_TEST_STANDARD_SIGN) != -1: 68 if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \ 69 result_output.find(CPP_TEST_END_SIGN) != -1: 70 return True 71 if check_open_source_test(result_output): 72 return True 73 if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \ 74 result_output.find(_END_JSUNIT_RUN_MARKER) >= 1: 75 return True 76 77 if result_output.find(INSTALL_END_MARKER) != -1: 78 return True 79 if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1 80 and result_output.find(CPP_TEST_STOP_SIGN) != -1): 81 LOG.info("Find test stop") 82 return True 83 if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output: 84 LOG.error("Execute file not exist, result is %s" % result_output, 85 error_no="00402") 86 raise LiteDeviceExecuteCommandError("execute file not exist", 87 error_no="00402") 88 elif input_command.startswith("zcat"): 89 return False 90 else: 91 if "OHOS #" in result_output or "# " in result_output: 92 if input_command == "reboot" or input_command == "reset": 93 return False 94 if input_command.startswith("mount"): 95 if "Mount nfs finished." not in result_output: 96 return False 97 return True 98 return False 99 100 101def generate_report(receiver, result): 102 if result and receiver: 103 if result: 104 receiver.__read__(result) 105 receiver.__done__() 106 107 108def get_current_time(): 109 current_time = time.time() 110 local_time = time.localtime(current_time) 111 data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time) 112 millisecond = (current_time - int(current_time)) * 1000 113 return "%s.%03d" % (data_head, millisecond) 114 115 116class LiteHelper: 117 @staticmethod 118 def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT, 119 receiver=None): 120 """ 121 Executes command on the device. 122 123 Parameters: 124 telnet: 125 command: the command to execute 126 timeout: timeout for read result 127 receiver: parser handler 128 """ 129 from xdevice import Scheduler 130 time.sleep(2) 131 start_time = time.time() 132 status = True 133 error_message = "" 134 result = "" 135 if not telnet: 136 raise LiteDeviceConnectError("remote device is not connected.", 137 error_no="00402") 138 139 telnet.write(command.encode('ascii') + b"\n") 140 while time.time() - start_time < timeout: 141 data = telnet.read_until(bytes(command, encoding="utf8"), 142 timeout=1) 143 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 144 "\r", "") 145 result = "{}{}".format(result, data) 146 if command in result: 147 break 148 149 expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"), 150 bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"), 151 bytes(CPP_TEST_END_SIGN, encoding="utf8"), 152 bytes(CPP_TEST_STOP_SIGN, encoding="utf8")] 153 while time.time() - start_time < timeout: 154 if not Scheduler.is_execute: 155 raise ExecuteTerminate("Execute terminate", error_no="00300") 156 _, _, data = telnet.expect(expect_result, timeout=1) 157 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 158 "\r", "") 159 result = "{}{}".format(result, data) 160 if receiver and data: 161 receiver.__read__(data) 162 if check_read_test_end(result, command): 163 break 164 else: 165 error_message = "execute %s timed out %s " % (command, timeout) 166 status = False 167 168 if receiver: 169 receiver.__done__() 170 171 if not status and command.startswith("uname"): 172 raise LiteDeviceTimeout("Execute command time out:%s" % command) 173 174 return result, status, error_message 175 176 @staticmethod 177 def read_local_output_test(com=None, command=None, timeout=TIMEOUT, 178 receiver=None): 179 input_command = command 180 linux_end_command = "" 181 if "--gtest_output=" in command: 182 linux_end_command = input_command.split(":")[1].split( 183 "reports")[0].rstrip("/") + " #" 184 error_message = "" 185 start_time = time.time() 186 result = "" 187 status = True 188 from xdevice import Scheduler 189 while time.time() - start_time < timeout: 190 if not Scheduler.is_execute: 191 raise ExecuteTerminate("Execute terminate", error_no="00300") 192 if com.in_waiting == 0: 193 continue 194 data = com.read(com.in_waiting).decode('gbk', errors='ignore') 195 data = PATTERN.sub('', data).replace("\r", "") 196 result = "{}{}".format(result, data) 197 if receiver and data: 198 receiver.__read__(data) 199 if check_read_test_end(result, input_command): 200 break 201 else: 202 error_message = "execute %s timed out %s " % (command, timeout) 203 status = False 204 205 if receiver: 206 receiver.__done__() 207 208 if not status and command.startswith("uname"): 209 raise LiteDeviceTimeout("Execute command time out:%s" % command) 210 211 return result, status, error_message 212 213 @staticmethod 214 def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT, 215 receiver=None): 216 result = "" 217 input_command = command 218 219 start = time.time() 220 from xdevice import Scheduler 221 while True: 222 if not Scheduler.is_execute: 223 raise ExecuteTerminate("Execute terminate", error_no="00300") 224 data = com.readline().decode('gbk', errors='ignore') 225 data = PATTERN.sub('', data) 226 if isinstance(input_command, list): 227 if len(data.strip()) > 0: 228 data = "{} {}".format(get_current_time(), data) 229 if data and receiver: 230 receiver.__read__(data.replace("\r", "")) 231 result = "{}{}".format(result, data.replace("\r", "")) 232 if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+" 233 r"Ignored", data): 234 start = time.time() 235 if CTEST_END_SIGN in data: 236 break 237 if (int(time.time()) - int(start)) > timeout: 238 break 239 else: 240 result = "{}{}".format( 241 result, data.replace("\r", "").replace("\n", "").strip()) 242 if AT_CMD_ENDS in data: 243 return result, True, "" 244 if (int(time.time()) - int(start)) > timeout: 245 return result, False, "" 246 247 if receiver: 248 receiver.__done__() 249 LOG.info('Info: execute command success') 250 return result, True, "" 251 252 @staticmethod 253 def read_local_output(com=None, command=None, case_type="", 254 timeout=TIMEOUT, receiver=None): 255 if case_type == DeviceTestType.ctest_lite: 256 return LiteHelper.read_local_output_ctest(com, command, 257 timeout, receiver) 258 else: 259 return LiteHelper.read_local_output_test(com, command, 260 timeout, receiver) 261 262 @staticmethod 263 def execute_local_cmd_with_timeout(com, **kwargs): 264 """ 265 Execute command on the serial and read all the output from the serial. 266 """ 267 args = kwargs 268 command = args.get("command", None) 269 input_command = command 270 case_type = args.get("case_type", "") 271 timeout = args.get("timeout", TIMEOUT) 272 receiver = args.get("receiver", None) 273 if not com: 274 raise LiteDeviceConnectError("local device is not connected.", 275 error_no="00402") 276 277 LOG.info("local_%s execute command shell %s with timeout %ss" % 278 (com.port, command, str(timeout))) 279 280 if isinstance(command, str): 281 command = command.encode("utf-8") 282 if command[-2:] != b"\r\n": 283 command = command.rstrip() + b'\r\n' 284 com.write(command) 285 else: 286 com.write(command) 287 return LiteHelper.read_local_output( 288 com, command=input_command, case_type=case_type, timeout=timeout, 289 receiver=receiver) 290 291 @staticmethod 292 def execute_local_command(com, command): 293 """ 294 Execute command on the serial and read all the output from the serial. 295 """ 296 if not com: 297 raise LiteDeviceConnectError("local device is not connected.", 298 error_no="00402") 299 300 LOG.info( 301 "local_%s execute command shell %s" % (com.port, command)) 302 command = command.encode("utf-8") 303 if command[-2:] != b"\r\n": 304 command = command.rstrip() + b'\r\n' 305 com.write(command) 306