1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import time 20import threading 21 22from xdevice import UserConfigManager 23from xdevice import DeviceOsType 24from xdevice import ManagerType 25from xdevice import DeviceAllocationState 26from xdevice import Plugin 27from xdevice import get_plugin 28from xdevice import IDeviceManager 29from xdevice import platform_logger 30from xdevice import convert_ip 31from xdevice import convert_port 32from xdevice import convert_serial 33 34from ohos.exception import LiteDeviceError 35 36__all__ = ["ManagerLite"] 37 38LOG = platform_logger("ManagerLite") 39 40 41@Plugin(type=Plugin.MANAGER, id=ManagerType.lite_device) 42class ManagerLite(IDeviceManager): 43 """ 44 Class representing device manager that 45 managing the set of available devices for testing 46 """ 47 48 def __init__(self): 49 self.devices_list = [] 50 self.list_con = threading.Condition() 51 self.support_labels = ["ipcamera", "wifiiot", "watchGT"] 52 self.support_types = ["device"] 53 54 def init_environment(self, environment="", user_config_file=""): 55 device_lite = get_plugin(plugin_type=Plugin.DEVICE, 56 plugin_id=DeviceOsType.lite)[0] 57 58 devices = UserConfigManager( 59 config_file=user_config_file, env=environment).get_com_device( 60 "environment/device") 61 62 for device in devices: 63 try: 64 device_lite_instance = device_lite.__class__() 65 device_lite_instance.__init_device__(device) 66 device_lite_instance.device_allocation_state = \ 67 DeviceAllocationState.available 68 except LiteDeviceError as exception: 69 LOG.warning(exception) 70 continue 71 72 self.devices_list.append(device_lite_instance) 73 74 def env_stop(self): 75 pass 76 77 def apply_device(self, device_option, timeout=10): 78 """ 79 Request a device for testing that meets certain criteria. 80 """ 81 del timeout 82 LOG.debug("Lite apply device: apply lock") 83 self.list_con.acquire() 84 try: 85 allocated_device = None 86 for device in self.devices_list: 87 if device_option.matches(device): 88 device.device_allocation_state = \ 89 DeviceAllocationState.allocated 90 LOG.debug("Allocate device sn: %s, type: %s" % ( 91 convert_serial(device.__get_serial__()), 92 device.__class__)) 93 return device 94 time.sleep(10) 95 return allocated_device 96 finally: 97 LOG.debug("Lite apply device: release lock") 98 self.list_con.release() 99 100 def release_device(self, device): 101 LOG.debug("Lite release device: apply lock") 102 self.list_con.acquire() 103 try: 104 if device.device_allocation_state == \ 105 DeviceAllocationState.allocated: 106 device.device_allocation_state = \ 107 DeviceAllocationState.available 108 LOG.debug("Free device sn: %s, type: %s" % ( 109 device.__get_serial__(), device.__class__)) 110 finally: 111 LOG.debug("Lite release device: release lock") 112 self.list_con.release() 113 114 def reset_device(self, device): 115 pass 116 117 def list_devices(self): 118 print("Lite devices:") 119 print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}{6:<16}". 120 format("SerialPort/IP", "Baudrate/Port", "OsType", "Allocation", 121 "Product", "ConnectType", "ComType")) 122 for device in self.devices_list: 123 if device.device_connect_type == "remote" or \ 124 device.device_connect_type == "agent": 125 print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}".format( 126 convert_ip(device.device.host), 127 convert_port(device.device.port), 128 device.device_os_type, 129 device.device_allocation_state, 130 device.label, 131 device.device_connect_type)) 132 else: 133 for com_controller in device.device.com_dict: 134 print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}{6:<16}". 135 format(convert_port(device.device.com_dict[ 136 com_controller].serial_port), 137 device.device.com_dict[ 138 com_controller].baud_rate, 139 device.device_os_type, 140 device.device_allocation_state, 141 device.label, 142 device.device_connect_type, 143 com_controller)) 144