1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import re 20import telnetlib 21import time 22import os 23import threading 24 25from xdevice import DeviceOsType 26from xdevice import DeviceProperties 27from xdevice import ConfigConst 28from xdevice import DeviceLabelType 29from xdevice import ModeType 30from xdevice import IDevice 31from xdevice import platform_logger 32from xdevice import DeviceAllocationState 33from xdevice import Plugin 34from xdevice import exec_cmd 35from xdevice import convert_serial 36from xdevice import convert_ip 37from xdevice import check_mode 38 39from ohos.exception import LiteDeviceConnectError 40from ohos.exception import LiteDeviceTimeout 41from ohos.exception import LiteParamError 42from ohos.environment.dmlib_lite import LiteHelper 43from ohos.constants import ComType 44 45LOG = platform_logger("DeviceLite") 46TIMEOUT = 90 47RETRY_ATTEMPTS = 0 48HDC = "litehdc.exe" 49DEFAULT_BAUD_RATE = 115200 50 51 52def get_hdc_path(): 53 from xdevice import Variables 54 user_path = os.path.join(Variables.exec_dir, "resource/tools") 55 top_user_path = os.path.join(Variables.top_dir, "config") 56 config_path = os.path.join(Variables.res_dir, "config") 57 paths = [user_path, top_user_path, config_path] 58 59 file_path = "" 60 for path in paths: 61 if os.path.exists(os.path.abspath(os.path.join( 62 path, HDC))): 63 file_path = os.path.abspath(os.path.join( 64 path, HDC)) 65 break 66 67 if os.path.exists(file_path): 68 return file_path 69 else: 70 raise LiteParamError("litehdc.exe not found", error_no="00108") 71 72 73def parse_available_com(com_str): 74 com_str = com_str.replace("\r", " ") 75 com_list = com_str.split("\n") 76 for index, item in enumerate(com_list): 77 com_list[index] = item.strip().strip(b'\x00'.decode()) 78 return com_list 79 80 81def perform_device_action(func): 82 def device_action(self, *args, **kwargs): 83 if not self.get_recover_state(): 84 LOG.debug("Device %s %s is false" % (self.device_sn, 85 ConfigConst.recover_state)) 86 return "", "", "" 87 88 tmp = int(kwargs.get("retry", RETRY_ATTEMPTS)) 89 retry = tmp + 1 if tmp > 0 else 1 90 exception = None 91 for num in range(retry): 92 try: 93 result = func(self, *args, **kwargs) 94 return result 95 except LiteDeviceTimeout as error: 96 LOG.error(error) 97 exception = error 98 if num: 99 self.recover_device() 100 except Exception as error: 101 LOG.error(error) 102 exception = error 103 raise exception 104 105 return device_action 106 107 108@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite) 109class DeviceLite(IDevice): 110 """ 111 Class representing a device lite device. 112 113 Each object of this class represents one device lite device in xDevice. 114 115 Attributes: 116 device_connect_type: A string that's the type of lite device 117 """ 118 device_os_type = DeviceOsType.lite 119 device_allocation_state = DeviceAllocationState.available 120 121 def __init__(self): 122 self.device_sn = "" 123 self.label = "" 124 self.device_connect_type = "" 125 self.device_kernel = "" 126 self.device = None 127 self.ifconfig = None 128 self.device_id = None 129 self.extend_value = {} 130 self.device_lock = threading.RLock() 131 self.device_props = {} 132 self.device_description = {} 133 134 def init_description(self): 135 if self.device_description: 136 return 137 desc = { 138 DeviceProperties.sn: convert_serial(self.device_sn), 139 DeviceProperties.model: self.label, 140 DeviceProperties.type_: self.label, 141 DeviceProperties.platform: "OpenHarmony", 142 DeviceProperties.version: "", 143 DeviceProperties.others: self.device_props 144 } 145 self.device_description.update(desc) 146 147 def __set_serial__(self, device=None): 148 for item in device: 149 if "ip" in item.keys() and "port" in item.keys(): 150 self.device_sn = "remote_%s_%s" % \ 151 (item.get("ip"), item.get("port")) 152 break 153 elif "type" in item.keys() and "com" in item.keys(): 154 self.device_sn = "local_%s" % item.get("com") 155 break 156 157 def __get_serial__(self): 158 return self.device_sn 159 160 def get(self, key=None, default=None): 161 if not key: 162 return default 163 value = getattr(self, key, None) 164 if value: 165 return value 166 else: 167 return self.extend_value.get(key, default) 168 169 def update_device_props(self, props): 170 if self.device_props or not isinstance(props, dict): 171 return 172 self.device_props.update(props) 173 174 def __set_device_kernel__(self, kernel_type=""): 175 self.device_kernel = kernel_type 176 177 def __get_device_kernel__(self): 178 return self.device_kernel 179 180 @staticmethod 181 def _check_watchgt(device): 182 for item in device: 183 if "label" not in item.keys(): 184 error_message = "watchGT local label does not exist" 185 raise LiteParamError(error_message, error_no="00108") 186 if "com" not in item.keys() or ("com" in item.keys() and 187 not item.get("com")): 188 error_message = "watchGT local com cannot be " \ 189 "empty, please check" 190 raise LiteParamError(error_message, error_no="00108") 191 else: 192 hdc = get_hdc_path() 193 result = exec_cmd([hdc]) 194 com_list = parse_available_com(result) 195 if item.get("com").upper() in com_list: 196 return True 197 else: 198 error_message = "watchGT local com does not exist" 199 raise LiteParamError(error_message, error_no="00108") 200 201 @staticmethod 202 def _check_wifiiot_config(device): 203 com_type_set = set() 204 for item in device: 205 if "label" not in item.keys(): 206 if "com" not in item.keys() or ("com" in item.keys() and 207 not item.get("com")): 208 error_message = "wifiiot local com cannot be " \ 209 "empty, please check" 210 raise LiteParamError(error_message, error_no="00108") 211 212 if "type" not in item.keys() or ("type" not in item.keys() and 213 not item.get("type")): 214 error_message = "wifiiot com type cannot be " \ 215 "empty, please check" 216 raise LiteParamError(error_message, error_no="00108") 217 else: 218 com_type_set.add(item.get("type")) 219 if len(com_type_set) < 2: 220 error_message = "wifiiot need cmd com and deploy com" \ 221 " at the same time, please check" 222 raise LiteParamError(error_message, error_no="00108") 223 224 @staticmethod 225 def _check_ipcamera_local(device): 226 for item in device: 227 if "label" not in item.keys(): 228 if "com" not in item.keys() or ("com" in item.keys() and 229 not item.get("com")): 230 error_message = "ipcamera local com cannot be " \ 231 "empty, please check" 232 raise LiteParamError(error_message, error_no="00108") 233 234 @staticmethod 235 def _check_ipcamera_remote(device=None): 236 for item in device: 237 if "label" not in item.keys(): 238 if "port" in item.keys() and item.get("port") and not item.get( 239 "port").isnumeric(): 240 error_message = "ipcamera remote port should be " \ 241 "a number, please check" 242 raise LiteParamError(error_message, error_no="00108") 243 elif "port" not in item.keys(): 244 error_message = "ipcamera remote port cannot be" \ 245 " empty, please check" 246 raise LiteParamError(error_message, error_no="00108") 247 248 def __check_config__(self, device=None): 249 self.set_connect_type(device) 250 if self.label == DeviceLabelType.wifiiot: 251 self._check_wifiiot_config(device) 252 elif self.label == DeviceLabelType.ipcamera and \ 253 self.device_connect_type == "local": 254 self._check_ipcamera_local(device) 255 elif self.label == DeviceLabelType.ipcamera and \ 256 self.device_connect_type == "remote": 257 self._check_ipcamera_remote(device) 258 elif self.label == DeviceLabelType.watch_gt: 259 self._check_watchgt(device) 260 261 def set_connect_type(self, device): 262 pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \ 263 r'01]?\d\d?)$' 264 for item in device: 265 if "label" in item.keys(): 266 self.label = item.get("label") 267 if "com" in item.keys(): 268 self.device_connect_type = "local" 269 if "ip" in item.keys(): 270 if re.match(pattern, item.get("ip")): 271 self.device_connect_type = "remote" 272 else: 273 error_message = "Remote device ip not in right" \ 274 "format, please check user_config.xml" 275 raise LiteParamError(error_message, error_no="00108") 276 if not self.label: 277 error_message = "device label cannot be empty, " \ 278 "please check" 279 raise LiteParamError(error_message, error_no="00108") 280 else: 281 if self.label != DeviceLabelType.wifiiot and \ 282 self.label != DeviceLabelType.ipcamera and \ 283 self.label != DeviceLabelType.watch_gt: 284 error_message = "device label should be ipcamera or" \ 285 " wifiiot, please check" 286 raise LiteParamError(error_message, error_no="00108") 287 if not self.device_connect_type: 288 error_message = "device com or ip cannot be empty, " \ 289 "please check" 290 raise LiteParamError(error_message, error_no="00108") 291 292 def __init_device__(self, device): 293 self.__check_config__(device) 294 self.__set_serial__(device) 295 if self.device_connect_type == "remote": 296 self.device = CONTROLLER_DICT.get("remote")(device) 297 else: 298 self.device = CONTROLLER_DICT.get("local")(device) 299 300 self.ifconfig = device[1].get("ifconfig") 301 302 def connect(self): 303 """ 304 Connect the device 305 306 """ 307 try: 308 self.device.connect() 309 except LiteDeviceConnectError as _: 310 if check_mode(ModeType.decc): 311 LOG.debug("Set device %s recover state to false" % 312 self.device_sn) 313 self.device_allocation_state = DeviceAllocationState.unusable 314 self.set_recover_state(False) 315 raise 316 317 @perform_device_action 318 def execute_command_with_timeout(self, command="", case_type="", 319 timeout=TIMEOUT, **kwargs): 320 """Executes command on the device. 321 322 Args: 323 command: the command to execute 324 case_type: CTest or CppTest 325 timeout: timeout for read result 326 **kwargs: receiver - parser handler input 327 328 Returns: 329 (filter_result, status, error_message) 330 331 filter_result: command execution result 332 status: true or false 333 error_message: command execution error message 334 """ 335 receiver = kwargs.get("receiver", None) 336 if self.device_connect_type == "remote": 337 LOG.info("%s execute command shell %s with timeout %ss" % 338 (convert_serial(self.__get_serial__()), command, 339 str(timeout))) 340 filter_result, status, error_message = \ 341 self.device.execute_command_with_timeout( 342 command=command, 343 timeout=timeout, 344 receiver=receiver) 345 elif self.device_connect_type == "agent": 346 filter_result, status, error_message = \ 347 self.device.execute_command_with_timeout( 348 command=command, 349 case_type=case_type, 350 timeout=timeout, 351 receiver=receiver, type="cmd") 352 else: 353 filter_result, status, error_message = \ 354 self.device.execute_command_with_timeout( 355 command=command, 356 case_type=case_type, 357 timeout=timeout, 358 receiver=receiver) 359 if not receiver: 360 LOG.debug("%s execute result:%s" % ( 361 convert_serial(self.__get_serial__()), filter_result)) 362 if not status: 363 LOG.debug( 364 "%s error_message:%s" % (convert_serial(self.__get_serial__()), 365 error_message)) 366 return filter_result, status, error_message 367 368 def recover_device(self): 369 self.reboot() 370 371 def reboot(self): 372 self.connect() 373 filter_result, status, error_message = self. \ 374 execute_command_with_timeout(command="reset", timeout=30) 375 if not filter_result: 376 if check_mode(ModeType.decc): 377 LOG.debug("Set device %s recover state to false" % 378 self.device_sn) 379 self.device_allocation_state = DeviceAllocationState.unusable 380 self.set_recover_state(False) 381 if self.ifconfig: 382 enter_result, _, _ = self.execute_command_with_timeout(command='\r', 383 timeout=15) 384 if " #" in enter_result or "OHOS #" in enter_result: 385 LOG.info("Reset device %s success" % self.device_sn) 386 self.execute_command_with_timeout(command=self.ifconfig, 387 timeout=5) 388 elif "hisilicon #" in enter_result: 389 LOG.info("Reset device %s fail" % self.device_sn) 390 391 ifconfig_result, _, _ = self.execute_command_with_timeout( 392 command="ifconfig", 393 timeout=5) 394 395 def close(self): 396 """ 397 Close the telnet connection with device server or close the local 398 serial 399 """ 400 self.device.close() 401 402 def set_recover_state(self, state): 403 with self.device_lock: 404 setattr(self, ConfigConst.recover_state, state) 405 406 def get_recover_state(self, default_state=True): 407 with self.device_lock: 408 state = getattr(self, ConfigConst.recover_state, default_state) 409 return state 410 411 412class RemoteController: 413 """ 414 Class representing an device lite remote device. 415 Each object of this class represents one device lite remote device 416 in xDevice. 417 """ 418 419 def __init__(self, device): 420 self.host = device[1].get("ip") 421 self.port = int(device[1].get("port")) 422 self.telnet = None 423 424 def connect(self): 425 """ 426 Connect the device server 427 428 """ 429 try: 430 if self.telnet: 431 return self.telnet 432 self.telnet = telnetlib.Telnet(self.host, self.port, 433 timeout=TIMEOUT) 434 except Exception as err_msgs: 435 error_message = "Connect remote lite device failed, host is %s, " \ 436 "port is %s, error is %s" % \ 437 (convert_ip(self.host), self.port, str(err_msgs)) 438 raise LiteDeviceConnectError(error_message, error_no="00401") 439 time.sleep(2) 440 self.telnet.set_debuglevel(0) 441 return self.telnet 442 443 def execute_command_with_timeout(self, command="", timeout=TIMEOUT, 444 receiver=None): 445 """ 446 Executes command on the device. 447 448 Parameters: 449 command: the command to execute 450 timeout: timeout for read result 451 receiver: parser handler 452 """ 453 return LiteHelper.execute_remote_cmd_with_timeout( 454 self.telnet, command, timeout, receiver) 455 456 def close(self): 457 """ 458 Close the telnet connection with device server 459 """ 460 try: 461 if not self.telnet: 462 return 463 self.telnet.close() 464 self.telnet = None 465 except (ConnectionError, Exception) as _: 466 error_message = "Remote device is disconnected abnormally" 467 LOG.error(error_message, error_no="00401") 468 469 470class LocalController: 471 def __init__(self, device): 472 """ 473 Init Local device. 474 Parameters: 475 device: local device 476 """ 477 self.com_dict = {} 478 for item in device: 479 if "com" in item.keys(): 480 if "type" in item.keys() and ComType.cmd_com == item.get( 481 "type"): 482 self.com_dict[ComType.cmd_com] = ComController(item) 483 elif "type" in item.keys() and ComType.deploy_com == item.get( 484 "type"): 485 self.com_dict[ComType.deploy_com] = ComController(item) 486 487 def connect(self, key=ComType.cmd_com): 488 """ 489 Open serial. 490 """ 491 self.com_dict.get(key).connect() 492 493 def close(self, key=ComType.cmd_com): 494 """ 495 Close serial. 496 """ 497 if self.com_dict and self.com_dict.get(key): 498 self.com_dict.get(key).close() 499 500 def execute_command_with_timeout(self, **kwargs): 501 """ 502 Execute command on the serial and read all the output from the serial. 503 """ 504 args = kwargs 505 key = args.get("key", ComType.cmd_com) 506 command = args.get("command", None) 507 case_type = args.get("case_type", "") 508 receiver = args.get("receiver", None) 509 timeout = args.get("timeout", TIMEOUT) 510 return self.com_dict.get(key).execute_command_with_timeout( 511 command=command, case_type=case_type, 512 timeout=timeout, receiver=receiver) 513 514 515class ComController: 516 def __init__(self, device): 517 """ 518 Init serial. 519 Parameters: 520 device: local com 521 """ 522 self.is_open = False 523 self.com = None 524 self.serial_port = device.get("com", None) 525 self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE)) 526 self.timeout = int(device.get("timeout", TIMEOUT)) 527 self.usb_port = device.get("usb_port", None) 528 529 def connect(self): 530 """ 531 Open serial. 532 """ 533 try: 534 if not self.is_open: 535 import serial 536 self.com = serial.Serial(self.serial_port, 537 baudrate=self.baud_rate, 538 timeout=self.timeout) 539 self.is_open = True 540 except Exception as error_msg: 541 error = "connect %s serial failed, please make sure this port is" \ 542 " not occupied, error is %s[00401]" % \ 543 (self.serial_port, str(error_msg)) 544 raise LiteDeviceConnectError(error, error_no="00401") 545 546 def close(self): 547 """ 548 Close serial. 549 """ 550 try: 551 if not self.com: 552 return 553 if self.is_open: 554 self.com.close() 555 self.is_open = False 556 except (ConnectionError, Exception) as _: 557 error_message = "Local device is disconnected abnormally" 558 LOG.error(error_message, error_no="00401") 559 560 def execute_command_with_timeout(self, **kwargs): 561 """ 562 Execute command on the serial and read all the output from the serial. 563 """ 564 return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs) 565 566 def execute_command(self, command): 567 """ 568 Execute command on the serial and read all the output from the serial. 569 """ 570 LiteHelper.execute_local_command(self.com, command) 571 572 573CONTROLLER_DICT = { 574 "local": LocalController, 575 "remote": RemoteController, 576} 577