1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import re 20import time 21import os 22import threading 23 24from xdevice import DeviceOsType 25from xdevice import DeviceProperties 26from xdevice import ConfigConst 27from xdevice import DeviceLabelType 28from xdevice import ModeType 29from xdevice import IDevice 30from xdevice import platform_logger 31from xdevice import DeviceAllocationState 32from xdevice import Plugin 33from xdevice import exec_cmd 34from xdevice import convert_serial 35from xdevice import convert_mac 36from xdevice import convert_ip 37from xdevice import check_mode 38from xdevice import Platform 39 40from ohos.constants import ComType 41from ohos.environment.dmlib_lite import LiteHelper 42from ohos.error import ErrorMessage 43from ohos.exception import LiteDeviceConnectError 44from ohos.exception import LiteDeviceTimeout 45from ohos.exception import LiteParamError 46 47LOG = platform_logger("DeviceLite") 48TIMEOUT = 90 49RETRY_ATTEMPTS = 0 50HDC = "litehdc.exe" 51DEFAULT_BAUD_RATE = 115200 52 53 54def get_hdc_path(): 55 from xdevice import Variables 56 user_path = os.path.join(Variables.exec_dir, "resource/tools") 57 top_user_path = os.path.join(Variables.top_dir, "config") 58 config_path = os.path.join(Variables.res_dir, "config") 59 paths = [user_path, top_user_path, config_path] 60 61 file_path = "" 62 for path in paths: 63 if os.path.exists(os.path.abspath(os.path.join( 64 path, HDC))): 65 file_path = os.path.abspath(os.path.join( 66 path, HDC)) 67 break 68 69 if os.path.exists(file_path): 70 return file_path 71 else: 72 raise LiteParamError(ErrorMessage.Common.Code_0301006) 73 74 75def parse_available_com(com_str): 76 com_str = com_str.replace("\r", " ") 77 com_list = com_str.split("\n") 78 for index, item in enumerate(com_list): 79 com_list[index] = item.strip().strip(b'\x00'.decode()) 80 return com_list 81 82 83def perform_device_action(func): 84 def device_action(self, *args, **kwargs): 85 if not self.get_recover_state(): 86 LOG.debug("Device %s %s is false" % (self.device_sn, 87 ConfigConst.recover_state)) 88 return "", "", "" 89 90 tmp = int(kwargs.get("retry", RETRY_ATTEMPTS)) 91 retry = tmp + 1 if tmp > 0 else 1 92 exception = None 93 for num in range(retry): 94 try: 95 result = func(self, *args, **kwargs) 96 return result 97 except LiteDeviceTimeout as error: 98 LOG.error(error) 99 exception = error 100 if num: 101 self.recover_device() 102 except Exception as error: 103 LOG.error(error) 104 exception = error 105 raise exception 106 107 return device_action 108 109 110@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite) 111class DeviceLite(IDevice): 112 """ 113 Class representing a device lite device. 114 115 Each object of this class represents one device lite device in xDevice. 116 117 Attributes: 118 device_connect_type: A string that's the type of lite device 119 """ 120 device_os_type = DeviceOsType.lite 121 device_allocation_state = DeviceAllocationState.available 122 123 def __init__(self): 124 self.device_sn = "" 125 self.label = "" 126 self.device_connect_type = "" 127 self.device_kernel = "" 128 self.device = None 129 self.ifconfig = None 130 self.device_id = None 131 self.extend_value = {} 132 self.device_lock = threading.RLock() 133 self.test_platform = Platform.ohos 134 self.device_props = {} 135 self.device_description = {} 136 137 def init_description(self): 138 if self.device_description: 139 return 140 desc = { 141 DeviceProperties.sn: convert_serial(self.device_sn), 142 DeviceProperties.model: self.label, 143 DeviceProperties.type_: self.label, 144 DeviceProperties.platform: self.test_platform, 145 DeviceProperties.version: "", 146 DeviceProperties.others: self.device_props 147 } 148 self.device_description.update(desc) 149 150 def __set_serial__(self, device=None): 151 for item in device: 152 if "ip" in item.keys() and "port" in item.keys(): 153 self.device_sn = "remote_%s_%s" % \ 154 (item.get("ip"), item.get("port")) 155 break 156 elif "type" in item.keys() and "com" in item.keys(): 157 self.device_sn = "local_%s" % item.get("com") 158 break 159 160 def __get_serial__(self): 161 return self.device_sn 162 163 def get(self, key=None, default=None): 164 if not key: 165 return default 166 value = getattr(self, key, None) 167 if value: 168 return value 169 else: 170 return self.extend_value.get(key, default) 171 172 def update_device_props(self, props): 173 if self.device_props or not isinstance(props, dict): 174 return 175 self.device_props.update(props) 176 177 def __set_device_kernel__(self, kernel_type=""): 178 self.device_kernel = kernel_type 179 180 def __get_device_kernel__(self): 181 return self.device_kernel 182 183 @staticmethod 184 def _check_watchgt(device): 185 for item in device: 186 if "label" not in item.keys(): 187 raise LiteParamError(ErrorMessage.Config.Code_0302027) 188 if "com" not in item.keys() or ("com" in item.keys() and 189 not item.get("com")): 190 raise LiteParamError(ErrorMessage.Config.Code_0302028) 191 else: 192 hdc = get_hdc_path() 193 result = exec_cmd([hdc]) 194 com_list = parse_available_com(result) 195 if item.get("com").upper() in com_list: 196 return True 197 else: 198 raise LiteParamError(ErrorMessage.Config.Code_0302029) 199 200 @staticmethod 201 def _check_wifiiot_config(device): 202 com_type_set = set() 203 for item in device: 204 if "label" not in item.keys(): 205 if "com" not in item.keys() or ("com" in item.keys() and 206 not item.get("com")): 207 raise LiteParamError(ErrorMessage.Config.Code_0302030) 208 209 if "type" not in item.keys() or ("type" not in item.keys() and 210 not item.get("type")): 211 raise LiteParamError(ErrorMessage.Config.Code_0302031) 212 else: 213 com_type_set.add(item.get("type")) 214 if len(com_type_set) < 2: 215 raise LiteParamError(ErrorMessage.Config.Code_0302032) 216 217 @staticmethod 218 def _check_ipcamera_local(device): 219 for item in device: 220 if "label" not in item.keys(): 221 if "com" not in item.keys() or ("com" in item.keys() and 222 not item.get("com")): 223 raise LiteParamError(ErrorMessage.Config.Code_0302033) 224 225 @staticmethod 226 def _check_ipcamera_remote(device=None): 227 for item in device: 228 if "label" not in item.keys(): 229 if "port" in item.keys() and item.get("port") and not item.get( 230 "port").isnumeric(): 231 raise LiteParamError(ErrorMessage.Config.Code_0302034) 232 elif "port" not in item.keys(): 233 raise LiteParamError(ErrorMessage.Config.Code_0302035) 234 235 def __check_config__(self, device=None): 236 self.set_connect_type(device) 237 if self.label == DeviceLabelType.wifiiot: 238 self._check_wifiiot_config(device) 239 elif self.label == DeviceLabelType.ipcamera and \ 240 self.device_connect_type == "local": 241 self._check_ipcamera_local(device) 242 elif self.label == DeviceLabelType.ipcamera and \ 243 self.device_connect_type == "remote": 244 self._check_ipcamera_remote(device) 245 elif self.label == DeviceLabelType.watch_gt: 246 self._check_watchgt(device) 247 248 def set_connect_type(self, device): 249 pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \ 250 r'01]?\d\d?)$' 251 for item in device: 252 if "label" in item.keys(): 253 self.label = item.get("label") 254 if "com" in item.keys(): 255 self.device_connect_type = "local" 256 if "ip" in item.keys(): 257 if re.match(pattern, item.get("ip")): 258 self.device_connect_type = "remote" 259 else: 260 raise LiteParamError(ErrorMessage.Config.Code_0302025) 261 if not self.label: 262 raise LiteParamError(ErrorMessage.Config.Code_0302026) 263 else: 264 if self.label != DeviceLabelType.wifiiot and \ 265 self.label != DeviceLabelType.ipcamera and \ 266 self.label != DeviceLabelType.watch_gt: 267 raise LiteParamError(ErrorMessage.Config.Code_0302020) 268 if not self.device_connect_type: 269 raise LiteParamError(ErrorMessage.Config.Code_0302021) 270 271 def __init_device__(self, device): 272 self.__check_config__(device) 273 self.__set_serial__(device) 274 if self.device_connect_type == "remote": 275 self.device = CONTROLLER_DICT.get("remote")(device) 276 else: 277 self.device = CONTROLLER_DICT.get("local")(device) 278 279 self.ifconfig = device[1].get("ifconfig") 280 if self.ifconfig: 281 self.connect() 282 self.execute_command_with_timeout(command='\r', timeout=5) 283 self.execute_command_with_timeout(command=self.ifconfig, timeout=5) 284 285 def connect(self): 286 """ 287 Connect the device 288 289 """ 290 try: 291 self.device.connect() 292 except LiteDeviceConnectError as _: 293 if check_mode(ModeType.decc): 294 LOG.debug("Set device %s recover state to false" % 295 self.device_sn) 296 self.device_allocation_state = DeviceAllocationState.unusable 297 self.set_recover_state(False) 298 raise 299 300 @perform_device_action 301 def execute_command_with_timeout(self, command="", case_type="", 302 timeout=TIMEOUT, **kwargs): 303 """Executes command on the device. 304 305 Args: 306 command: the command to execute 307 case_type: CTest or CppTest 308 timeout: timeout for read result 309 **kwargs: receiver - parser handler input 310 311 Returns: 312 (filter_result, status, error_message) 313 314 filter_result: command execution result 315 status: true or false 316 error_message: command execution error message 317 """ 318 receiver = kwargs.get("receiver", None) 319 if self.device_connect_type == "remote": 320 LOG.info("%s execute command shell %s with timeout %ss" % 321 (convert_serial(self.__get_serial__()), command, 322 str(timeout))) 323 filter_result, status, error_message = \ 324 self.device.execute_command_with_timeout( 325 command=command, 326 timeout=timeout, 327 receiver=receiver) 328 elif self.device_connect_type == "agent": 329 filter_result, status, error_message = \ 330 self.device.execute_command_with_timeout( 331 command=command, 332 case_type=case_type, 333 timeout=timeout, 334 receiver=receiver, type="cmd") 335 else: 336 filter_result, status, error_message = \ 337 self.device.execute_command_with_timeout( 338 command=command, 339 case_type=case_type, 340 timeout=timeout, 341 receiver=receiver) 342 if not receiver: 343 LOG.debug("{} execute result:{}".format(convert_serial(self.__get_serial__()), 344 convert_mac(filter_result.replace("BS", "")))) 345 if not status: 346 LOG.debug("{} error_message:{}".format(convert_serial(self.__get_serial__()), 347 convert_mac(error_message.replace("BS", "")))) 348 349 return filter_result.replace("BS", ""), status, error_message 350 351 def recover_device(self): 352 self.reboot() 353 354 def reboot(self): 355 self.connect() 356 filter_result, status, error_message = self. \ 357 execute_command_with_timeout(command="reset", timeout=30) 358 if not filter_result: 359 if check_mode(ModeType.decc): 360 LOG.debug("Set device %s recover state to false" % 361 self.device_sn) 362 self.device_allocation_state = DeviceAllocationState.unusable 363 self.set_recover_state(False) 364 if self.ifconfig: 365 enter_result, _, _ = self.execute_command_with_timeout( 366 command='\r', 367 timeout=15) 368 if " #" in enter_result or "OHOS #" in enter_result: 369 LOG.info("Reset device %s success" % self.device_sn) 370 self.execute_command_with_timeout(command=self.ifconfig, 371 timeout=5) 372 elif "hisilicon #" in enter_result: 373 LOG.info("Reset device %s fail" % self.device_sn) 374 375 ifconfig_result, _, _ = self.execute_command_with_timeout( 376 command="ifconfig", 377 timeout=5) 378 379 def close(self): 380 """ 381 Close the telnet connection with device server or close the local 382 serial 383 """ 384 self.device.close() 385 386 def set_recover_state(self, state): 387 with self.device_lock: 388 setattr(self, ConfigConst.recover_state, state) 389 390 def get_recover_state(self, default_state=True): 391 with self.device_lock: 392 state = getattr(self, ConfigConst.recover_state, default_state) 393 return state 394 395 396class RemoteController: 397 """ 398 Class representing a device lite remote device. 399 Each object of this class represents one device lite remote device 400 in xDevice. 401 """ 402 403 def __init__(self, device): 404 self.host = device[1].get("ip") 405 self.port = int(device[1].get("port")) 406 self.telnet = None 407 408 def connect(self): 409 """ 410 Connect the device server 411 """ 412 try: 413 if self.telnet: 414 return self.telnet 415 import telnetlib 416 self.telnet = telnetlib.Telnet(self.host, self.port, 417 timeout=TIMEOUT) 418 except Exception as error: 419 raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303008.format( 420 convert_ip(self.host), self.port, str(error))) from error 421 time.sleep(2) 422 self.telnet.set_debuglevel(0) 423 return self.telnet 424 425 def execute_command_with_timeout(self, command="", timeout=TIMEOUT, 426 receiver=None): 427 """ 428 Executes command on the device. 429 430 Parameters: 431 command: the command to execute 432 timeout: timeout for read result 433 receiver: parser handler 434 """ 435 return LiteHelper.execute_remote_cmd_with_timeout( 436 self.telnet, command, timeout, receiver) 437 438 def close(self): 439 """ 440 Close the telnet connection with device server 441 """ 442 try: 443 if not self.telnet: 444 return 445 self.telnet.close() 446 self.telnet = None 447 except ConnectionError as _: 448 error_message = "Remote device is disconnected abnormally" 449 LOG.error(error_message, error_no="00401") 450 451 452class LocalController: 453 def __init__(self, device): 454 """ 455 Init Local device. 456 Parameters: 457 device: local device 458 """ 459 self.com_dict = {} 460 for item in device: 461 if "com" in item.keys(): 462 if "type" in item.keys() and ComType.cmd_com == item.get( 463 "type"): 464 self.com_dict[ComType.cmd_com] = ComController(item) 465 elif "type" in item.keys() and ComType.deploy_com == item.get( 466 "type"): 467 self.com_dict[ComType.deploy_com] = ComController(item) 468 469 def connect(self, key=ComType.cmd_com): 470 """ 471 Open serial. 472 """ 473 self.com_dict.get(key).connect() 474 475 def close(self, key=ComType.cmd_com): 476 """ 477 Close serial. 478 """ 479 if self.com_dict and self.com_dict.get(key): 480 self.com_dict.get(key).close() 481 482 def execute_command_with_timeout(self, **kwargs): 483 """ 484 Execute command on the serial and read all the output from the serial. 485 """ 486 args = kwargs 487 key = args.get("key", ComType.cmd_com) 488 command = args.get("command", None) 489 case_type = args.get("case_type", "") 490 receiver = args.get("receiver", None) 491 timeout = args.get("timeout", TIMEOUT) 492 return self.com_dict.get(key).execute_command_with_timeout( 493 command=command, case_type=case_type, 494 timeout=timeout, receiver=receiver) 495 496 497class ComController: 498 def __init__(self, device): 499 """ 500 Init serial. 501 Parameters: 502 device: local com 503 """ 504 self.is_open = False 505 self.com = None 506 self.serial_port = device.get("com", None) 507 self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE)) 508 self.timeout = int(device.get("timeout", TIMEOUT)) 509 self.usb_port = device.get("usb_port", None) 510 511 def connect(self): 512 """ 513 Open serial. 514 """ 515 try: 516 if not self.is_open: 517 import serial 518 self.com = serial.Serial(self.serial_port, 519 baudrate=self.baud_rate, 520 timeout=self.timeout) 521 self.is_open = True 522 return self.com 523 except Exception as error: 524 raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303009.format( 525 self.serial_port, str(error))) from error 526 return None 527 528 def close(self): 529 """ 530 Close serial. 531 """ 532 try: 533 if not self.com: 534 return 535 if self.is_open: 536 self.com.close() 537 self.is_open = False 538 except ConnectionError as _: 539 error_message = "Local device is disconnected abnormally" 540 LOG.error(error_message, error_no="00401") 541 542 def execute_command_with_timeout(self, **kwargs): 543 """ 544 Execute command on the serial and read all the output from the serial. 545 """ 546 return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs) 547 548 def execute_command(self, command): 549 """ 550 Execute command on the serial and read all the output from the serial. 551 """ 552 LiteHelper.execute_local_command(self.com, command) 553 554 555CONTROLLER_DICT = { 556 "local": LocalController, 557 "remote": RemoteController, 558} 559