1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import time 21import re 22 23from _core.constants import DeviceTestType 24from _core.exception import ExecuteTerminate 25from _core.exception import LiteDeviceTimeout 26from _core.exception import LiteDeviceConnectError 27from _core.exception import LiteDeviceExecuteCommandError 28from _core.logger import platform_logger 29from _core.utils import convert_port 30 31__all__ = ["generate_report", "LiteHelper"] 32 33CPP_TEST_STANDARD_SIGN = "[==========]" 34CPP_TEST_END_SIGN = "Gtest xml output finished" 35CPP_SYS_STANDARD_SIGN = "OHOS #" 36CPP_ERR_MESSAGE = "[ERR]No such file or directory: " 37CTEST_STANDARD_SIGN = "Start to run test suite" 38AT_CMD_ENDS = "OK" 39CTEST_END_SIGN = "All the test suites finished" 40CPP_TEST_STOP_SIGN = "Test Stop" 41CPP_TEST_MOUNT_SIGN = "not mount properly" 42 43_START_JSUNIT_RUN_MARKER = "[start] start run suites" 44_END_JSUNIT_RUN_MARKER = "[end] run suites end" 45INSTALL_END_MARKER = "resultMessage is install success !" 46 47PATTERN = re.compile(r'\x1B(\[([0-9]{1,2}(;[0-9]{1,2})*)?m)*') 48TIMEOUT = 90 49LOG = platform_logger("DmlibLite") 50 51 52def check_open_source_test(result_output): 53 if result_output.find(CPP_TEST_STANDARD_SIGN) == -1 and \ 54 ("test pass" in result_output.lower() or 55 "test fail" in result_output.lower() or 56 "tests pass" in result_output.lower() or 57 "tests fail" in result_output.lower()): 58 return True 59 return False 60 61 62def check_read_test_end(result=None, input_command=None): 63 temp_result = result.replace("\n", "") 64 # if input_command not in temp_result: 65 # return False 66 index = result.find(input_command) + len(input_command) 67 result_output = result[index:] 68 if input_command.startswith("./"): 69 if result_output.find(CPP_TEST_STANDARD_SIGN) != -1: 70 if result_output.count(CPP_TEST_STANDARD_SIGN) == 2 or \ 71 result_output.find(CPP_TEST_END_SIGN) != -1: 72 return True 73 if check_open_source_test(result_output): 74 return True 75 if result_output.find(_START_JSUNIT_RUN_MARKER) >= 1 and \ 76 result_output.find(_END_JSUNIT_RUN_MARKER) >= 1: 77 return True 78 79 if result_output.find(INSTALL_END_MARKER) != -1: 80 return True 81 if (result_output.find(CPP_TEST_MOUNT_SIGN) != -1 82 and result_output.find(CPP_TEST_STOP_SIGN) != -1): 83 LOG.info("find test stop") 84 return True 85 if "%s%s" % (CPP_ERR_MESSAGE, input_command[2:]) in result_output: 86 LOG.error("execute file not exist, result is %s" % result_output, 87 error_no="00402") 88 raise LiteDeviceExecuteCommandError("execute file not exist", 89 error_no="00402") 90 elif input_command.startswith("zcat"): 91 return False 92 else: 93 if "OHOS #" in result_output or "# " in result_output: 94 if input_command == "reboot" or input_command == "reset": 95 return False 96 if input_command.startswith("mount"): 97 if "Mount nfs finished." not in result_output: 98 return False 99 return True 100 return False 101 102 103def generate_report(receiver, result): 104 if result and receiver: 105 if result: 106 receiver.__read__(result) 107 receiver.__done__() 108 109 110def get_current_time(): 111 current_time = time.time() 112 local_time = time.localtime(current_time) 113 data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time) 114 millisecond = (current_time - int(current_time)) * 1000 115 return "%s.%03d" % (data_head, millisecond) 116 117 118class LiteHelper: 119 @staticmethod 120 def execute_remote_cmd_with_timeout(telnet, command="", timeout=TIMEOUT, 121 receiver=None): 122 """ 123 Executes command on the device. 124 125 Parameters: 126 telnet: 127 command: the command to execute 128 timeout: timeout for read result 129 receiver: parser handler 130 """ 131 from xdevice import Scheduler 132 time.sleep(2) 133 start_time = time.time() 134 status = True 135 error_message = "" 136 result = "" 137 if not telnet: 138 raise LiteDeviceConnectError("remote device is not connected.", 139 error_no="00402") 140 141 telnet.write(command.encode('ascii') + b"\n") 142 while time.time() - start_time < timeout: 143 data = telnet.read_until(bytes(command, encoding="utf8"), 144 timeout=1) 145 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 146 "\r", "") 147 result = "{}{}".format(result, data) 148 if command in result: 149 break 150 151 expect_result = [bytes(CPP_TEST_STANDARD_SIGN, encoding="utf8"), 152 bytes(CPP_SYS_STANDARD_SIGN, encoding="utf8"), 153 bytes(CPP_TEST_END_SIGN, encoding="utf8"), 154 bytes(CPP_TEST_STOP_SIGN, encoding="utf8")] 155 while time.time() - start_time < timeout: 156 if not Scheduler.is_execute: 157 raise ExecuteTerminate("Execute terminate", error_no="00300") 158 _, _, data = telnet.expect(expect_result, timeout=1) 159 data = PATTERN.sub('', data.decode('gbk', 'ignore')).replace( 160 "\r", "") 161 result = "{}{}".format(result, data) 162 if receiver and data: 163 receiver.__read__(data) 164 if check_read_test_end(result, command): 165 break 166 else: 167 error_message = "execute %s timed out %s " % (command, timeout) 168 status = False 169 170 if receiver: 171 receiver.__done__() 172 173 if not status and command.startswith("uname"): 174 raise LiteDeviceTimeout("Execute command time out:%s" % command) 175 176 return result, status, error_message 177 178 @staticmethod 179 def read_local_output_test(com=None, command=None, timeout=TIMEOUT, 180 receiver=None): 181 input_command = command 182 linux_end_command = "" 183 if "--gtest_output=" in command: 184 linux_end_command = input_command.split(":")[1].split( 185 "reports")[0].rstrip("/") + " #" 186 error_message = "" 187 start_time = time.time() 188 result = "" 189 status = True 190 from xdevice import Scheduler 191 192 # while time.time() - start_time < timeout: 193 # data = com.readline().decode('gbk', errors='ignore') 194 # data = PATTERN.sub('', data).replace("\r", "") 195 # result = "{}{}".format(result, data) 196 # if command in result or linux_end_command in result: 197 # break 198 199 while time.time() - start_time < timeout: 200 if not Scheduler.is_execute: 201 raise ExecuteTerminate("Execute terminate", error_no="00300") 202 data = com.readline().decode('gbk', errors='ignore') 203 data = PATTERN.sub('', data).replace("\r", "") 204 result = "{}{}".format(result, data) 205 if receiver and data: 206 receiver.__read__(data) 207 if check_read_test_end(result, input_command): 208 break 209 else: 210 error_message = "execute %s timed out %s " % (command, timeout) 211 status = False 212 213 if receiver: 214 receiver.__done__() 215 216 if not status and command.startswith("uname"): 217 raise LiteDeviceTimeout("Execute command time out:%s" % command) 218 219 return result, status, error_message 220 221 @staticmethod 222 def read_local_output_ctest(com=None, command=None, timeout=TIMEOUT, 223 receiver=None): 224 result = "" 225 input_command = command 226 227 start = time.time() 228 from xdevice import Scheduler 229 while True: 230 if not Scheduler.is_execute: 231 raise ExecuteTerminate("Execute terminate", error_no="00300") 232 data = com.readline().decode('gbk', errors='ignore') 233 data = PATTERN.sub('', data) 234 if isinstance(input_command, list): 235 if len(data.strip()) > 0: 236 data = "{} {}".format(get_current_time(), data) 237 if data and receiver: 238 receiver.__read__(data.replace("\r", "")) 239 result = "{}{}".format(result, data.replace("\r", "")) 240 if re.search(r"\d+\s+Tests\s+\d+\s+Failures\s+\d+\s+" 241 r"Ignored", data): 242 start = time.time() 243 if CTEST_END_SIGN in data: 244 break 245 if (int(time.time()) - int(start)) > timeout: 246 break 247 else: 248 result = "{}{}".format( 249 result, data.replace("\r", "").replace("\n", "").strip()) 250 if AT_CMD_ENDS in data: 251 return result, True, "" 252 if (int(time.time()) - int(start)) > timeout: 253 return result, False, "" 254 255 if receiver: 256 receiver.__done__() 257 LOG.info('Info: execute command success') 258 return result, True, "" 259 260 @staticmethod 261 def read_local_output(com=None, command=None, case_type="", 262 timeout=TIMEOUT, receiver=None): 263 if case_type == DeviceTestType.ctest_lite: 264 return LiteHelper.read_local_output_ctest(com, command, 265 timeout, receiver) 266 else: 267 return LiteHelper.read_local_output_test(com, command, 268 timeout, receiver) 269 270 @staticmethod 271 def execute_local_cmd_with_timeout(com, **kwargs): 272 """ 273 Execute command on the serial and read all the output from the serial. 274 """ 275 args = kwargs 276 command = args.get("command", None) 277 input_command = command 278 case_type = args.get("case_type", "") 279 timeout = args.get("timeout", TIMEOUT) 280 receiver = args.get("receiver", None) 281 if not com: 282 raise LiteDeviceConnectError("local device is not connected.", 283 error_no="00402") 284 285 LOG.info("local_%s execute command shell %s with timeout %ss" % 286 (convert_port(com.port), command, str(timeout))) 287 288 if isinstance(command, str): 289 command = command.encode("utf-8") 290 if command[-2:] != b"\r\n": 291 command = command.rstrip() + b'\r\n' 292 com.write(command) 293 else: 294 com.write(command) 295 return LiteHelper.read_local_output( 296 com, command=input_command, case_type=case_type, timeout=timeout, 297 receiver=receiver) 298 299 @staticmethod 300 def execute_local_command(com, command): 301 """ 302 Execute command on the serial and read all the output from the serial. 303 """ 304 if not com: 305 raise LiteDeviceConnectError("local device is not connected.", 306 error_no="00402") 307 308 LOG.info( 309 "local_%s execute command shell %s" % (convert_port(com.port), 310 command)) 311 command = command.encode("utf-8") 312 if command[-2:] != b"\r\n": 313 command = command.rstrip() + b'\r\n' 314 com.write(command) 315 316 @staticmethod 317 def execute_agent_cmd_with_timeout(self, command, 318 **kwargs): 319 import requests 320 321 base_url = self.base_url 322 headers = self.headers 323 local_source_dir = self.local_source_dir 324 target_save_path = self.target_save_path 325 args = kwargs 326 command_type = args.get("type", None) 327 sync = args.get("sync", 1) 328 response = None 329 try: 330 if "cmd" == command_type: 331 url = base_url + "/_processes/" 332 data = { 333 'cmd': command, 334 'sync': sync, 335 'stdoutRedirect': 'file', 336 'stderrRedirect': 'file' 337 } 338 response = requests.post(url=url, headers=headers, 339 data=json.dumps(data)) 340 if response.status_code == 200: 341 LOG.info("connect OK") 342 else: 343 LOG.info("connect failed") 344 response.close() 345 346 elif "put" == command_type: 347 url = base_url + target_save_path + command 348 with open(local_source_dir + command, "rb") as data: 349 response = requests.put(url=url, headers=headers, 350 data=data) 351 if response.status_code == 200: 352 LOG.info("{} upload file to {} " 353 "success".format(local_source_dir, 354 target_save_path)) 355 else: 356 LOG.info( 357 "{} upload file to {} fail".format(local_source_dir, 358 target_save_path)) 359 response.close() 360 elif "get" == command_type: 361 url = base_url + target_save_path + command 362 response = requests.get(url=url, headers=headers, stream=True) 363 if response.status_code == 200: 364 with open(local_source_dir + command + "bak", "wb") \ 365 as file_name: 366 for file_data in response.iter_content(chunk_size=512): 367 file_name.write(file_data) 368 LOG.info("from {} download file to {} success".format( 369 target_save_path + command, 370 local_source_dir)) 371 else: 372 LOG.info("{} download file to {} fail".format( 373 target_save_path + command, 374 command, 375 local_source_dir)) 376 elif "delete" == command_type: 377 url = base_url + target_save_path + command 378 LOG.info(url) 379 response = requests.delete(url) 380 if response.status_code == 200: 381 LOG.info("delete {} file success".format( 382 target_save_path + command)) 383 else: 384 LOG.info("delete {} file fail".format( 385 target_save_path + command)) 386 else: 387 LOG.info("type error") 388 except Exception as error_message: 389 LOG.info("error_message:{}".format(error_message)) 390 finally: 391 if response: 392 response.close() 393 394 return "", True, "" 395