1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import re 20import telnetlib 21import time 22import os 23import threading 24 25from xdevice import DeviceOsType 26from xdevice import ConfigConst 27from xdevice import DeviceLabelType 28from xdevice import ModeType 29from xdevice import IDevice 30from xdevice import platform_logger 31from xdevice import DeviceAllocationState 32from xdevice import Plugin 33from xdevice import exec_cmd 34from xdevice import convert_serial 35from xdevice import convert_ip 36from xdevice import check_mode 37 38from ohos.exception import LiteDeviceConnectError 39from ohos.exception import LiteDeviceTimeout 40from ohos.exception import LiteParamError 41from ohos.environment.dmlib_lite import LiteHelper 42from ohos.constants import ComType 43 44LOG = platform_logger("DeviceLite") 45TIMEOUT = 90 46RETRY_ATTEMPTS = 0 47HDC = "litehdc.exe" 48DEFAULT_BAUD_RATE = 115200 49 50 51def get_hdc_path(): 52 from xdevice import Variables 53 user_path = os.path.join(Variables.exec_dir, "resource/tools") 54 top_user_path = os.path.join(Variables.top_dir, "config") 55 config_path = os.path.join(Variables.res_dir, "config") 56 paths = [user_path, top_user_path, config_path] 57 58 file_path = "" 59 for path in paths: 60 if os.path.exists(os.path.abspath(os.path.join( 61 path, HDC))): 62 file_path = os.path.abspath(os.path.join( 63 path, HDC)) 64 break 65 66 if os.path.exists(file_path): 67 return file_path 68 else: 69 raise LiteParamError("litehdc.exe not found", error_no="00108") 70 71 72def parse_available_com(com_str): 73 com_str = com_str.replace("\r", " ") 74 com_list = com_str.split("\n") 75 for index, item in enumerate(com_list): 76 com_list[index] = item.strip().strip(b'\x00'.decode()) 77 return com_list 78 79 80def perform_device_action(func): 81 def device_action(self, *args, **kwargs): 82 if not self.get_recover_state(): 83 LOG.debug("Device %s %s is false" % (self.device_sn, 84 ConfigConst.recover_state)) 85 return "", "", "" 86 87 tmp = int(kwargs.get("retry", RETRY_ATTEMPTS)) 88 retry = tmp + 1 if tmp > 0 else 1 89 exception = None 90 for num in range(retry): 91 try: 92 result = func(self, *args, **kwargs) 93 return result 94 except LiteDeviceTimeout as error: 95 LOG.error(error) 96 exception = error 97 if num: 98 self.recover_device() 99 except Exception as error: 100 LOG.error(error) 101 exception = error 102 raise exception 103 104 return device_action 105 106 107@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite) 108class DeviceLite(IDevice): 109 """ 110 Class representing an device lite device. 111 112 Each object of this class represents one device lite device in xDevice. 113 114 Attributes: 115 device_connect_type: A string that's the type of lite device 116 """ 117 device_os_type = DeviceOsType.lite 118 device_allocation_state = DeviceAllocationState.available 119 120 def __init__(self): 121 self.device_sn = "" 122 self.label = "" 123 self.device_connect_type = "" 124 self.device_kernel = "" 125 self.device = None 126 self.ifconfig = None 127 self.extend_value = {} 128 self.device_lock = threading.RLock() 129 130 def __set_serial__(self, device=None): 131 for item in device: 132 if "ip" in item.keys() and "port" in item.keys(): 133 self.device_sn = "remote_%s_%s" % \ 134 (item.get("ip"), item.get("port")) 135 break 136 elif "type" in item.keys() and "com" in item.keys(): 137 self.device_sn = "local_%s" % item.get("com") 138 break 139 140 def __get_serial__(self): 141 return self.device_sn 142 143 def get(self, key=None, default=None): 144 if not key: 145 return default 146 value = getattr(self, key, None) 147 if value: 148 return value 149 else: 150 return self.extend_value.get(key, default) 151 152 def __set_device_kernel__(self, kernel_type=""): 153 self.device_kernel = kernel_type 154 155 def __get_device_kernel__(self): 156 return self.device_kernel 157 158 @staticmethod 159 def _check_watchgt(device): 160 for item in device: 161 if "label" not in item.keys(): 162 error_message = "watchGT local label does not exist" 163 raise LiteParamError(error_message, error_no="00108") 164 if "com" not in item.keys() or ("com" in item.keys() and 165 not item.get("com")): 166 error_message = "watchGT local com cannot be " \ 167 "empty, please check" 168 raise LiteParamError(error_message, error_no="00108") 169 else: 170 hdc = get_hdc_path() 171 result = exec_cmd([hdc]) 172 com_list = parse_available_com(result) 173 if item.get("com").upper() in com_list: 174 return True 175 else: 176 error_message = "watchGT local com does not exist" 177 raise LiteParamError(error_message, error_no="00108") 178 179 @staticmethod 180 def _check_wifiiot_config(device): 181 com_type_set = set() 182 for item in device: 183 if "label" not in item.keys(): 184 if "com" not in item.keys() or ("com" in item.keys() and 185 not item.get("com")): 186 error_message = "wifiiot local com cannot be " \ 187 "empty, please check" 188 raise LiteParamError(error_message, error_no="00108") 189 190 if "type" not in item.keys() or ("type" not in item.keys() and 191 not item.get("type")): 192 error_message = "wifiiot com type cannot be " \ 193 "empty, please check" 194 raise LiteParamError(error_message, error_no="00108") 195 else: 196 com_type_set.add(item.get("type")) 197 if len(com_type_set) < 2: 198 error_message = "wifiiot need cmd com and deploy com" \ 199 " at the same time, please check" 200 raise LiteParamError(error_message, error_no="00108") 201 202 @staticmethod 203 def _check_ipcamera_local(device): 204 for item in device: 205 if "label" not in item.keys(): 206 if "com" not in item.keys() or ("com" in item.keys() and 207 not item.get("com")): 208 error_message = "ipcamera local com cannot be " \ 209 "empty, please check" 210 raise LiteParamError(error_message, error_no="00108") 211 212 @staticmethod 213 def _check_ipcamera_remote(device=None): 214 for item in device: 215 if "label" not in item.keys(): 216 if "port" in item.keys() and item.get("port") and not item.get( 217 "port").isnumeric(): 218 error_message = "ipcamera remote port should be " \ 219 "a number, please check" 220 raise LiteParamError(error_message, error_no="00108") 221 elif "port" not in item.keys(): 222 error_message = "ipcamera remote port cannot be" \ 223 " empty, please check" 224 raise LiteParamError(error_message, error_no="00108") 225 226 227 def __check_config__(self, device=None): 228 self.set_connect_type(device) 229 if self.label == DeviceLabelType.wifiiot: 230 self._check_wifiiot_config(device) 231 elif self.label == DeviceLabelType.ipcamera and \ 232 self.device_connect_type == "local": 233 self._check_ipcamera_local(device) 234 elif self.label == DeviceLabelType.ipcamera and \ 235 self.device_connect_type == "remote": 236 self._check_ipcamera_remote(device) 237 elif self.label == DeviceLabelType.watch_gt: 238 self._check_watchgt(device) 239 240 def set_connect_type(self, device): 241 pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \ 242 r'01]?\d\d?)$' 243 for item in device: 244 if "label" in item.keys(): 245 self.label = item.get("label") 246 if "com" in item.keys(): 247 self.device_connect_type = "local" 248 if "ip" in item.keys(): 249 if re.match(pattern, item.get("ip")): 250 self.device_connect_type = "remote" 251 else: 252 error_message = "Remote device ip not in right" \ 253 "format, please check user_config.xml" 254 raise LiteParamError(error_message, error_no="00108") 255 if not self.label: 256 error_message = "device label cannot be empty, " \ 257 "please check" 258 raise LiteParamError(error_message, error_no="00108") 259 else: 260 if self.label != DeviceLabelType.wifiiot and \ 261 self.label != DeviceLabelType.ipcamera and \ 262 self.label != DeviceLabelType.watch_gt: 263 error_message = "device label should be ipcamera or" \ 264 " wifiiot, please check" 265 raise LiteParamError(error_message, error_no="00108") 266 if not self.device_connect_type: 267 error_message = "device com or ip cannot be empty, " \ 268 "please check" 269 raise LiteParamError(error_message, error_no="00108") 270 271 def __init_device__(self, device): 272 self.__check_config__(device) 273 self.__set_serial__(device) 274 if self.device_connect_type == "remote": 275 self.device = CONTROLLER_DICT.get("remote")(device) 276 else: 277 self.device = CONTROLLER_DICT.get("local")(device) 278 279 self.ifconfig = device[1].get("ifconfig") 280 281 def connect(self): 282 """ 283 Connect the device 284 285 """ 286 try: 287 self.device.connect() 288 except LiteDeviceConnectError as _: 289 if check_mode(ModeType.decc): 290 LOG.debug("Set device %s recover state to false" % 291 self.device_sn) 292 self.device_allocation_state = DeviceAllocationState.unusable 293 self.set_recover_state(False) 294 raise 295 296 @perform_device_action 297 def execute_command_with_timeout(self, command="", case_type="", 298 timeout=TIMEOUT, **kwargs): 299 """Executes command on the device. 300 301 Args: 302 command: the command to execute 303 case_type: CTest or CppTest 304 timeout: timeout for read result 305 **kwargs: receiver - parser handler input 306 307 Returns: 308 (filter_result, status, error_message) 309 310 filter_result: command execution result 311 status: true or false 312 error_message: command execution error message 313 """ 314 receiver = kwargs.get("receiver", None) 315 if self.device_connect_type == "remote": 316 LOG.info("%s execute command shell %s with timeout %ss" % 317 (convert_serial(self.__get_serial__()), command, 318 str(timeout))) 319 filter_result, status, error_message = \ 320 self.device.execute_command_with_timeout( 321 command=command, 322 timeout=timeout, 323 receiver=receiver) 324 elif self.device_connect_type == "agent": 325 filter_result, status, error_message = \ 326 self.device.execute_command_with_timeout( 327 command=command, 328 case_type=case_type, 329 timeout=timeout, 330 receiver=receiver, type="cmd") 331 else: 332 filter_result, status, error_message = \ 333 self.device.execute_command_with_timeout( 334 command=command, 335 case_type=case_type, 336 timeout=timeout, 337 receiver=receiver) 338 if not receiver: 339 LOG.debug("%s execute result:%s" % ( 340 convert_serial(self.__get_serial__()), filter_result)) 341 if not status: 342 LOG.debug( 343 "%s error_message:%s" % (convert_serial(self.__get_serial__()), 344 error_message)) 345 return filter_result, status, error_message 346 347 def recover_device(self): 348 self.reboot() 349 350 def reboot(self): 351 self.connect() 352 filter_result, status, error_message = self. \ 353 execute_command_with_timeout(command="reset", timeout=30) 354 if not filter_result: 355 if check_mode(ModeType.decc): 356 LOG.debug("Set device %s recover state to false" % 357 self.device_sn) 358 self.device_allocation_state = DeviceAllocationState.unusable 359 self.set_recover_state(False) 360 if self.ifconfig: 361 enter_result, _, _ = self.execute_command_with_timeout(command='\r', 362 timeout=15) 363 if " #" in enter_result or "OHOS #" in enter_result: 364 LOG.info("Reset device %s success" % self.device_sn) 365 self.execute_command_with_timeout(command=self.ifconfig, 366 timeout=5) 367 elif "hisilicon #" in enter_result: 368 LOG.info("Reset device %s fail" % self.device_sn) 369 370 ifconfig_result, _, _ = self.execute_command_with_timeout( 371 command="ifconfig", 372 timeout=5) 373 374 def close(self): 375 """ 376 Close the telnet connection with device server or close the local 377 serial 378 """ 379 self.device.close() 380 381 def set_recover_state(self, state): 382 with self.device_lock: 383 setattr(self, ConfigConst.recover_state, state) 384 385 def get_recover_state(self, default_state=True): 386 with self.device_lock: 387 state = getattr(self, ConfigConst.recover_state, default_state) 388 return state 389 390 391class RemoteController: 392 """ 393 Class representing an device lite remote device. 394 Each object of this class represents one device lite remote device 395 in xDevice. 396 """ 397 398 def __init__(self, device): 399 self.host = device[1].get("ip") 400 self.port = int(device[1].get("port")) 401 self.telnet = None 402 403 def connect(self): 404 """ 405 Connect the device server 406 407 """ 408 try: 409 if self.telnet: 410 return self.telnet 411 self.telnet = telnetlib.Telnet(self.host, self.port, 412 timeout=TIMEOUT) 413 except Exception as err_msgs: 414 error_message = "Connect remote lite device failed, host is %s, " \ 415 "port is %s, error is %s" % \ 416 (convert_ip(self.host), self.port, str(err_msgs)) 417 raise LiteDeviceConnectError(error_message, error_no="00401") 418 time.sleep(2) 419 self.telnet.set_debuglevel(0) 420 return self.telnet 421 422 def execute_command_with_timeout(self, command="", timeout=TIMEOUT, 423 receiver=None): 424 """ 425 Executes command on the device. 426 427 Parameters: 428 command: the command to execute 429 timeout: timeout for read result 430 receiver: parser handler 431 """ 432 return LiteHelper.execute_remote_cmd_with_timeout( 433 self.telnet, command, timeout, receiver) 434 435 def close(self): 436 """ 437 Close the telnet connection with device server 438 """ 439 try: 440 if not self.telnet: 441 return 442 self.telnet.close() 443 self.telnet = None 444 except (ConnectionError, Exception) as _: 445 error_message = "Remote device is disconnected abnormally" 446 LOG.error(error_message, error_no="00401") 447 448 449class LocalController: 450 def __init__(self, device): 451 """ 452 Init Local device. 453 Parameters: 454 device: local device 455 """ 456 self.com_dict = {} 457 for item in device: 458 if "com" in item.keys(): 459 if "type" in item.keys() and ComType.cmd_com == item.get( 460 "type"): 461 self.com_dict[ComType.cmd_com] = ComController(item) 462 elif "type" in item.keys() and ComType.deploy_com == item.get( 463 "type"): 464 self.com_dict[ComType.deploy_com] = ComController(item) 465 466 def connect(self, key=ComType.cmd_com): 467 """ 468 Open serial. 469 """ 470 self.com_dict.get(key).connect() 471 472 def close(self, key=ComType.cmd_com): 473 """ 474 Close serial. 475 """ 476 if self.com_dict and self.com_dict.get(key): 477 self.com_dict.get(key).close() 478 479 def execute_command_with_timeout(self, **kwargs): 480 """ 481 Execute command on the serial and read all the output from the serial. 482 """ 483 args = kwargs 484 key = args.get("key", ComType.cmd_com) 485 command = args.get("command", None) 486 case_type = args.get("case_type", "") 487 receiver = args.get("receiver", None) 488 timeout = args.get("timeout", TIMEOUT) 489 return self.com_dict.get(key).execute_command_with_timeout( 490 command=command, case_type=case_type, 491 timeout=timeout, receiver=receiver) 492 493 494class ComController: 495 def __init__(self, device): 496 """ 497 Init serial. 498 Parameters: 499 device: local com 500 """ 501 self.is_open = False 502 self.com = None 503 self.serial_port = device.get("com", None) 504 self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE)) 505 self.timeout = int(device.get("timeout", TIMEOUT)) 506 self.usb_port = device.get("usb_port", None) 507 508 def connect(self): 509 """ 510 Open serial. 511 """ 512 try: 513 if not self.is_open: 514 import serial 515 self.com = serial.Serial(self.serial_port, 516 baudrate=self.baud_rate, 517 timeout=self.timeout) 518 self.is_open = True 519 except Exception as error_msg: 520 error = "connect %s serial failed, please make sure this port is" \ 521 " not occupied, error is %s[00401]" % \ 522 (self.serial_port, str(error_msg)) 523 raise LiteDeviceConnectError(error, error_no="00401") 524 525 def close(self): 526 """ 527 Close serial. 528 """ 529 try: 530 if not self.com: 531 return 532 if self.is_open: 533 self.com.close() 534 self.is_open = False 535 except (ConnectionError, Exception) as _: 536 error_message = "Local device is disconnected abnormally" 537 LOG.error(error_message, error_no="00401") 538 539 def execute_command_with_timeout(self, **kwargs): 540 """ 541 Execute command on the serial and read all the output from the serial. 542 """ 543 return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs) 544 545 def execute_command(self, command): 546 """ 547 Execute command on the serial and read all the output from the serial. 548 """ 549 LiteHelper.execute_local_command(self.com, command) 550 551 552CONTROLLER_DICT = { 553 "local": LocalController, 554 "remote": RemoteController, 555} 556