1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import re 20import telnetlib 21import time 22import os 23import threading 24 25from xdevice import DeviceOsType 26from xdevice import ConfigConst 27from xdevice import DeviceLabelType 28from xdevice import ModeType 29from xdevice import IDevice 30from xdevice import platform_logger 31from xdevice import DeviceAllocationState 32from xdevice import Plugin 33from xdevice import exec_cmd 34from xdevice import convert_serial 35from xdevice import convert_ip 36from xdevice import check_mode 37 38from ohos.exception import LiteDeviceConnectError 39from ohos.exception import LiteDeviceTimeout 40from ohos.exception import LiteParamError 41from ohos.environment.dmlib_lite import LiteHelper 42from ohos.constants import ComType 43 44LOG = platform_logger("DeviceLite") 45TIMEOUT = 90 46RETRY_ATTEMPTS = 0 47HDC = "litehdc.exe" 48DEFAULT_BAUD_RATE = 115200 49 50 51def get_hdc_path(): 52 from xdevice import Variables 53 user_path = os.path.join(Variables.exec_dir, "resource/tools") 54 top_user_path = os.path.join(Variables.top_dir, "config") 55 config_path = os.path.join(Variables.res_dir, "config") 56 paths = [user_path, top_user_path, config_path] 57 58 file_path = "" 59 for path in paths: 60 if os.path.exists(os.path.abspath(os.path.join( 61 path, HDC))): 62 file_path = os.path.abspath(os.path.join( 63 path, HDC)) 64 break 65 66 if os.path.exists(file_path): 67 return file_path 68 else: 69 raise LiteParamError("litehdc.exe not found", error_no="00108") 70 71 72def parse_available_com(com_str): 73 com_str = com_str.replace("\r", " ") 74 com_list = com_str.split("\n") 75 for index, item in enumerate(com_list): 76 com_list[index] = item.strip().strip(b'\x00'.decode()) 77 return com_list 78 79 80def perform_device_action(func): 81 def device_action(self, *args, **kwargs): 82 if not self.get_recover_state(): 83 LOG.debug("Device %s %s is false" % (self.device_sn, 84 ConfigConst.recover_state)) 85 return "", "", "" 86 87 tmp = int(kwargs.get("retry", RETRY_ATTEMPTS)) 88 retry = tmp + 1 if tmp > 0 else 1 89 exception = None 90 for num in range(retry): 91 try: 92 result = func(self, *args, **kwargs) 93 return result 94 except LiteDeviceTimeout as error: 95 LOG.error(error) 96 exception = error 97 if num: 98 self.recover_device() 99 except Exception as error: 100 LOG.error(error) 101 exception = error 102 raise exception 103 104 return device_action 105 106 107@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite) 108class DeviceLite(IDevice): 109 """ 110 Class representing an device lite device. 111 112 Each object of this class represents one device lite device in xDevice. 113 114 Attributes: 115 device_connect_type: A string that's the type of lite device 116 """ 117 device_os_type = DeviceOsType.lite 118 device_allocation_state = DeviceAllocationState.available 119 120 def __init__(self): 121 self.device_sn = "" 122 self.label = "" 123 self.device_connect_type = "" 124 self.device_kernel = "" 125 self.device = None 126 self.ifconfig = None 127 self.device_id = None 128 self.extend_value = {} 129 self.device_lock = threading.RLock() 130 131 def __set_serial__(self, device=None): 132 for item in device: 133 if "ip" in item.keys() and "port" in item.keys(): 134 self.device_sn = "remote_%s_%s" % \ 135 (item.get("ip"), item.get("port")) 136 break 137 elif "type" in item.keys() and "com" in item.keys(): 138 self.device_sn = "local_%s" % item.get("com") 139 break 140 141 def __get_serial__(self): 142 return self.device_sn 143 144 def get(self, key=None, default=None): 145 if not key: 146 return default 147 value = getattr(self, key, None) 148 if value: 149 return value 150 else: 151 return self.extend_value.get(key, default) 152 153 def __set_device_kernel__(self, kernel_type=""): 154 self.device_kernel = kernel_type 155 156 def __get_device_kernel__(self): 157 return self.device_kernel 158 159 @staticmethod 160 def _check_watchgt(device): 161 for item in device: 162 if "label" not in item.keys(): 163 error_message = "watchGT local label does not exist" 164 raise LiteParamError(error_message, error_no="00108") 165 if "com" not in item.keys() or ("com" in item.keys() and 166 not item.get("com")): 167 error_message = "watchGT local com cannot be " \ 168 "empty, please check" 169 raise LiteParamError(error_message, error_no="00108") 170 else: 171 hdc = get_hdc_path() 172 result = exec_cmd([hdc]) 173 com_list = parse_available_com(result) 174 if item.get("com").upper() in com_list: 175 return True 176 else: 177 error_message = "watchGT local com does not exist" 178 raise LiteParamError(error_message, error_no="00108") 179 180 @staticmethod 181 def _check_wifiiot_config(device): 182 com_type_set = set() 183 for item in device: 184 if "label" not in item.keys(): 185 if "com" not in item.keys() or ("com" in item.keys() and 186 not item.get("com")): 187 error_message = "wifiiot local com cannot be " \ 188 "empty, please check" 189 raise LiteParamError(error_message, error_no="00108") 190 191 if "type" not in item.keys() or ("type" not in item.keys() and 192 not item.get("type")): 193 error_message = "wifiiot com type cannot be " \ 194 "empty, please check" 195 raise LiteParamError(error_message, error_no="00108") 196 else: 197 com_type_set.add(item.get("type")) 198 if len(com_type_set) < 2: 199 error_message = "wifiiot need cmd com and deploy com" \ 200 " at the same time, please check" 201 raise LiteParamError(error_message, error_no="00108") 202 203 @staticmethod 204 def _check_ipcamera_local(device): 205 for item in device: 206 if "label" not in item.keys(): 207 if "com" not in item.keys() or ("com" in item.keys() and 208 not item.get("com")): 209 error_message = "ipcamera local com cannot be " \ 210 "empty, please check" 211 raise LiteParamError(error_message, error_no="00108") 212 213 @staticmethod 214 def _check_ipcamera_remote(device=None): 215 for item in device: 216 if "label" not in item.keys(): 217 if "port" in item.keys() and item.get("port") and not item.get( 218 "port").isnumeric(): 219 error_message = "ipcamera remote port should be " \ 220 "a number, please check" 221 raise LiteParamError(error_message, error_no="00108") 222 elif "port" not in item.keys(): 223 error_message = "ipcamera remote port cannot be" \ 224 " empty, please check" 225 raise LiteParamError(error_message, error_no="00108") 226 227 228 def __check_config__(self, device=None): 229 self.set_connect_type(device) 230 if self.label == DeviceLabelType.wifiiot: 231 self._check_wifiiot_config(device) 232 elif self.label == DeviceLabelType.ipcamera and \ 233 self.device_connect_type == "local": 234 self._check_ipcamera_local(device) 235 elif self.label == DeviceLabelType.ipcamera and \ 236 self.device_connect_type == "remote": 237 self._check_ipcamera_remote(device) 238 elif self.label == DeviceLabelType.watch_gt: 239 self._check_watchgt(device) 240 241 def set_connect_type(self, device): 242 pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \ 243 r'01]?\d\d?)$' 244 for item in device: 245 if "label" in item.keys(): 246 self.label = item.get("label") 247 if "com" in item.keys(): 248 self.device_connect_type = "local" 249 if "ip" in item.keys(): 250 if re.match(pattern, item.get("ip")): 251 self.device_connect_type = "remote" 252 else: 253 error_message = "Remote device ip not in right" \ 254 "format, please check user_config.xml" 255 raise LiteParamError(error_message, error_no="00108") 256 if not self.label: 257 error_message = "device label cannot be empty, " \ 258 "please check" 259 raise LiteParamError(error_message, error_no="00108") 260 else: 261 if self.label != DeviceLabelType.wifiiot and \ 262 self.label != DeviceLabelType.ipcamera and \ 263 self.label != DeviceLabelType.watch_gt: 264 error_message = "device label should be ipcamera or" \ 265 " wifiiot, please check" 266 raise LiteParamError(error_message, error_no="00108") 267 if not self.device_connect_type: 268 error_message = "device com or ip cannot be empty, " \ 269 "please check" 270 raise LiteParamError(error_message, error_no="00108") 271 272 def __init_device__(self, device): 273 self.__check_config__(device) 274 self.__set_serial__(device) 275 if self.device_connect_type == "remote": 276 self.device = CONTROLLER_DICT.get("remote")(device) 277 else: 278 self.device = CONTROLLER_DICT.get("local")(device) 279 280 self.ifconfig = device[1].get("ifconfig") 281 282 def connect(self): 283 """ 284 Connect the device 285 286 """ 287 try: 288 self.device.connect() 289 except LiteDeviceConnectError as _: 290 if check_mode(ModeType.decc): 291 LOG.debug("Set device %s recover state to false" % 292 self.device_sn) 293 self.device_allocation_state = DeviceAllocationState.unusable 294 self.set_recover_state(False) 295 raise 296 297 @perform_device_action 298 def execute_command_with_timeout(self, command="", case_type="", 299 timeout=TIMEOUT, **kwargs): 300 """Executes command on the device. 301 302 Args: 303 command: the command to execute 304 case_type: CTest or CppTest 305 timeout: timeout for read result 306 **kwargs: receiver - parser handler input 307 308 Returns: 309 (filter_result, status, error_message) 310 311 filter_result: command execution result 312 status: true or false 313 error_message: command execution error message 314 """ 315 receiver = kwargs.get("receiver", None) 316 if self.device_connect_type == "remote": 317 LOG.info("%s execute command shell %s with timeout %ss" % 318 (convert_serial(self.__get_serial__()), command, 319 str(timeout))) 320 filter_result, status, error_message = \ 321 self.device.execute_command_with_timeout( 322 command=command, 323 timeout=timeout, 324 receiver=receiver) 325 elif self.device_connect_type == "agent": 326 filter_result, status, error_message = \ 327 self.device.execute_command_with_timeout( 328 command=command, 329 case_type=case_type, 330 timeout=timeout, 331 receiver=receiver, type="cmd") 332 else: 333 filter_result, status, error_message = \ 334 self.device.execute_command_with_timeout( 335 command=command, 336 case_type=case_type, 337 timeout=timeout, 338 receiver=receiver) 339 if not receiver: 340 LOG.debug("%s execute result:%s" % ( 341 convert_serial(self.__get_serial__()), filter_result)) 342 if not status: 343 LOG.debug( 344 "%s error_message:%s" % (convert_serial(self.__get_serial__()), 345 error_message)) 346 return filter_result, status, error_message 347 348 def recover_device(self): 349 self.reboot() 350 351 def reboot(self): 352 self.connect() 353 filter_result, status, error_message = self. \ 354 execute_command_with_timeout(command="reset", timeout=30) 355 if not filter_result: 356 if check_mode(ModeType.decc): 357 LOG.debug("Set device %s recover state to false" % 358 self.device_sn) 359 self.device_allocation_state = DeviceAllocationState.unusable 360 self.set_recover_state(False) 361 if self.ifconfig: 362 enter_result, _, _ = self.execute_command_with_timeout(command='\r', 363 timeout=15) 364 if " #" in enter_result or "OHOS #" in enter_result: 365 LOG.info("Reset device %s success" % self.device_sn) 366 self.execute_command_with_timeout(command=self.ifconfig, 367 timeout=5) 368 elif "hisilicon #" in enter_result: 369 LOG.info("Reset device %s fail" % self.device_sn) 370 371 ifconfig_result, _, _ = self.execute_command_with_timeout( 372 command="ifconfig", 373 timeout=5) 374 375 def close(self): 376 """ 377 Close the telnet connection with device server or close the local 378 serial 379 """ 380 self.device.close() 381 382 def set_recover_state(self, state): 383 with self.device_lock: 384 setattr(self, ConfigConst.recover_state, state) 385 386 def get_recover_state(self, default_state=True): 387 with self.device_lock: 388 state = getattr(self, ConfigConst.recover_state, default_state) 389 return state 390 391 392class RemoteController: 393 """ 394 Class representing an device lite remote device. 395 Each object of this class represents one device lite remote device 396 in xDevice. 397 """ 398 399 def __init__(self, device): 400 self.host = device[1].get("ip") 401 self.port = int(device[1].get("port")) 402 self.telnet = None 403 404 def connect(self): 405 """ 406 Connect the device server 407 408 """ 409 try: 410 if self.telnet: 411 return self.telnet 412 self.telnet = telnetlib.Telnet(self.host, self.port, 413 timeout=TIMEOUT) 414 except Exception as err_msgs: 415 error_message = "Connect remote lite device failed, host is %s, " \ 416 "port is %s, error is %s" % \ 417 (convert_ip(self.host), self.port, str(err_msgs)) 418 raise LiteDeviceConnectError(error_message, error_no="00401") 419 time.sleep(2) 420 self.telnet.set_debuglevel(0) 421 return self.telnet 422 423 def execute_command_with_timeout(self, command="", timeout=TIMEOUT, 424 receiver=None): 425 """ 426 Executes command on the device. 427 428 Parameters: 429 command: the command to execute 430 timeout: timeout for read result 431 receiver: parser handler 432 """ 433 return LiteHelper.execute_remote_cmd_with_timeout( 434 self.telnet, command, timeout, receiver) 435 436 def close(self): 437 """ 438 Close the telnet connection with device server 439 """ 440 try: 441 if not self.telnet: 442 return 443 self.telnet.close() 444 self.telnet = None 445 except (ConnectionError, Exception) as _: 446 error_message = "Remote device is disconnected abnormally" 447 LOG.error(error_message, error_no="00401") 448 449 450class LocalController: 451 def __init__(self, device): 452 """ 453 Init Local device. 454 Parameters: 455 device: local device 456 """ 457 self.com_dict = {} 458 for item in device: 459 if "com" in item.keys(): 460 if "type" in item.keys() and ComType.cmd_com == item.get( 461 "type"): 462 self.com_dict[ComType.cmd_com] = ComController(item) 463 elif "type" in item.keys() and ComType.deploy_com == item.get( 464 "type"): 465 self.com_dict[ComType.deploy_com] = ComController(item) 466 467 def connect(self, key=ComType.cmd_com): 468 """ 469 Open serial. 470 """ 471 self.com_dict.get(key).connect() 472 473 def close(self, key=ComType.cmd_com): 474 """ 475 Close serial. 476 """ 477 if self.com_dict and self.com_dict.get(key): 478 self.com_dict.get(key).close() 479 480 def execute_command_with_timeout(self, **kwargs): 481 """ 482 Execute command on the serial and read all the output from the serial. 483 """ 484 args = kwargs 485 key = args.get("key", ComType.cmd_com) 486 command = args.get("command", None) 487 case_type = args.get("case_type", "") 488 receiver = args.get("receiver", None) 489 timeout = args.get("timeout", TIMEOUT) 490 return self.com_dict.get(key).execute_command_with_timeout( 491 command=command, case_type=case_type, 492 timeout=timeout, receiver=receiver) 493 494 495class ComController: 496 def __init__(self, device): 497 """ 498 Init serial. 499 Parameters: 500 device: local com 501 """ 502 self.is_open = False 503 self.com = None 504 self.serial_port = device.get("com", None) 505 self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE)) 506 self.timeout = int(device.get("timeout", TIMEOUT)) 507 self.usb_port = device.get("usb_port", None) 508 509 def connect(self): 510 """ 511 Open serial. 512 """ 513 try: 514 if not self.is_open: 515 import serial 516 self.com = serial.Serial(self.serial_port, 517 baudrate=self.baud_rate, 518 timeout=self.timeout) 519 self.is_open = True 520 except Exception as error_msg: 521 error = "connect %s serial failed, please make sure this port is" \ 522 " not occupied, error is %s[00401]" % \ 523 (self.serial_port, str(error_msg)) 524 raise LiteDeviceConnectError(error, error_no="00401") 525 526 def close(self): 527 """ 528 Close serial. 529 """ 530 try: 531 if not self.com: 532 return 533 if self.is_open: 534 self.com.close() 535 self.is_open = False 536 except (ConnectionError, Exception) as _: 537 error_message = "Local device is disconnected abnormally" 538 LOG.error(error_message, error_no="00401") 539 540 def execute_command_with_timeout(self, **kwargs): 541 """ 542 Execute command on the serial and read all the output from the serial. 543 """ 544 return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs) 545 546 def execute_command(self, command): 547 """ 548 Execute command on the serial and read all the output from the serial. 549 """ 550 LiteHelper.execute_local_command(self.com, command) 551 552 553CONTROLLER_DICT = { 554 "local": LocalController, 555 "remote": RemoteController, 556} 557