1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import json 19import os 20import time 21import datetime 22import sys 23 24from abc import abstractmethod 25from abc import ABCMeta 26from xml.etree import ElementTree 27from xml.etree.ElementTree import Element 28 29from _core.interface import IFilter 30from _core.interface import IDeviceManager 31from _core.logger import platform_logger 32from _core.plugin import Plugin 33from _core.plugin import get_plugin 34from _core.utils import convert_serial 35from _core.logger import change_logger_level 36from _core.constants import ConfigConst 37from _core.constants import FilePermission 38 39LOG = platform_logger("EnvPool") 40 41__all__ = ["EnvPool", "XMLNode", "Selector", "DeviceSelector", "DeviceNode"] 42 43 44class EnvPool(object): 45 """ 46 Class representing environment pool that 47 managing the set of available devices for testing. 48 this class is used directly by users without going through the command flow. 49 """ 50 __instance = None 51 __init_flag = False 52 53 def __new__(cls, *args, **kwargs): 54 """ 55 Singleton instance 56 """ 57 del args, kwargs 58 if cls.__instance is None: 59 cls.__instance = super(EnvPool, cls).__new__(cls) 60 return cls.__instance 61 62 def __init__(self, log_level="info"): 63 if EnvPool.__init_flag: 64 return 65 self._managers = {} 66 self._filters = {} 67 self._init_log_level(log_level) 68 self._load_managers() 69 EnvPool.__init_flag = True 70 # init cache file and check if expire 71 cache_file = Cache() 72 cache_file.check_cache_if_expire() 73 self.devices = list() 74 75 def _load_managers(self): 76 LOG.info("Load Managers ...") 77 manager_plugins = get_plugin(Plugin.MANAGER) 78 for manager_plugin in manager_plugins: 79 try: 80 manager_instance = manager_plugin.__class__() 81 self._managers[manager_instance.__class__.__name__] = \ 82 manager_instance 83 except Exception as error: 84 LOG.error("Pool start error: {}".format(error)) 85 # reverse sort 86 if self._managers: 87 self._managers = dict(sorted(self._managers.items(), reverse=True)) 88 89 def _unload_manager(self): 90 for manager in self._managers.values(): 91 if manager.__class__.__name__ not in self._filters: 92 continue 93 manager.devices_list = [] 94 self._managers = {} 95 EnvPool.__init_flag = False 96 97 def get_device(self, selector, timeout=10): 98 LOG.info("Get device by selector") 99 device = self._apply_device(selector, timeout) 100 if device is not None: 101 LOG.info("Device {}: extend value: {}".format( 102 convert_serial(device.device_sn), device.extend_value)) 103 self.devices.append(device) 104 else: 105 LOG.info("Require label is '{}', can't get device".format(selector.label)) 106 return device 107 108 def init_pool(self, node): 109 LOG.info("Prepare to init pool") 110 for manager in self._managers.values(): 111 if not isinstance(manager, IFilter): 112 continue 113 if not manager.__filter_xml_node__(node): 114 continue 115 if not isinstance(manager, IDeviceManager): 116 continue 117 manager.init_environment(node.format(), "") 118 self._filters[manager.__class__.__name__] = manager 119 LOG.info("Pool is prepared") 120 if not self._filters: 121 LOG.info("Can't find any manager, may be no connector are assign" 122 "or the plugins of manager are not installed!") 123 124 def shutdown(self): 125 # clear device rpc port 126 for device in self.devices: 127 if hasattr(device, "remove_ports"): 128 device.remove_ports() 129 self._unload_manager() 130 131 def _apply_device(self, selector, timeout=10): 132 LOG.info("Apply device in pool") 133 for manager_type, manager in self._filters.items(): 134 if not manager.__filter_selector__(selector): 135 continue 136 device_option = selector.format() 137 if not device_option: 138 continue 139 support_labels = getattr(manager, "support_labels", []) 140 support_types = getattr(manager, "support_types", []) 141 if device_option.required_manager not in support_types: 142 LOG.info("'{}' not in {}'s support types".format( 143 device_option.required_manager, manager_type)) 144 continue 145 if not support_labels: 146 continue 147 if device_option.label is None: 148 if manager_type != "ManagerDevice": 149 continue 150 else: 151 if support_labels and \ 152 device_option.label not in support_labels: 153 continue 154 device = manager.apply_device(device_option, timeout) 155 if hasattr(device, "env_index"): 156 device.env_index = device_option.get_env_index() 157 if device: 158 return device 159 else: 160 return None 161 162 @classmethod 163 def _init_log_level(cls, level): 164 if str(level).lower() not in ["debug", "info"]: 165 LOG.info("Level str must be 'debug or 'info") 166 return 167 change_logger_level({"console": level}) 168 169 170class XMLNode(metaclass=ABCMeta): 171 172 @abstractmethod 173 def __init__(self): 174 self.__device_ele = Element("device") 175 self.__connectors = [] 176 177 @abstractmethod 178 def __on_root_attrib__(self, attrib_dict): 179 pass 180 181 def add_element_string(self, element_str=""): 182 if element_str: 183 device_ele = ElementTree.fromstring(element_str) 184 if device_ele.tag == "device": 185 self.__device_ele = device_ele 186 return self 187 188 @classmethod 189 def create_node(cls, tag): 190 return Element(tag) 191 192 def build_connector(self, connector_name): 193 self.__connectors.append(connector_name) 194 return self 195 196 def get_connectors(self): 197 return self.__connectors 198 199 def format(self): 200 attrib_dict = dict() 201 self.__on_root_attrib__(attrib_dict) 202 self.__device_ele.attrib = attrib_dict 203 env = self.create_node("environment") 204 env.append(self.__device_ele) 205 root = self.create_node("user_config") 206 root.append(env) 207 return ElementTree.tostring(root, encoding="utf-8") 208 209 def get_root_node(self): 210 return self.__device_ele 211 212 213class Selector(metaclass=ABCMeta): 214 215 @abstractmethod 216 def __init__(self, _type, label): 217 self.__device_dict = dict() 218 self.__config = dict() 219 self.label = label 220 self.type = _type 221 222 def add_environment_content(self, content): 223 _content = content 224 if isinstance(_content, str): 225 _content = _content.strip() 226 if _content.startswith("[") and _content.endswith("]"): 227 self.__device_dict.update(json.loads(_content)[0]) 228 elif _content.startswith("{") and _content.endswith("}"): 229 self.__device_dict.update(json.loads(content)) 230 else: 231 raise RuntimeError("Invalid str input! ['{}']".format(_content)) 232 elif isinstance(_content, list): 233 self.__device_dict.update(_content[0]) 234 elif isinstance(_content, dict): 235 self.__device_dict.update(_content) 236 return self 237 238 @abstractmethod 239 def __on_config__(self, config, device_dict): 240 pass 241 242 @abstractmethod 243 def __on_selection_option__(self, selection_option): 244 pass 245 246 def add_label(self, label): 247 self.label = label 248 return self 249 250 def add_type(self, _type): 251 self.type = _type 252 return self 253 254 def format(self): 255 if self.type or self.label: 256 self.__device_dict.update({"type": self.type}) 257 self.__device_dict.update(({"label": self.label})) 258 self.__on_config__(self.__config, self.__device_dict) 259 index = 1 260 label = self.__device_dict.get("label", "phone") 261 required_manager = self.__device_dict.get("type", "device") 262 device_option = SelectionOption(self.__config, label) 263 self.__device_dict.pop("type", None) 264 self.__device_dict.pop("label", None) 265 device_option.required_manager = required_manager 266 device_option.extend_value = self.__device_dict 267 if hasattr(device_option, "env_index"): 268 device_option.env_index = index 269 index += 1 270 self.__on_selection_option__(device_option) 271 self.__device_dict.clear() 272 self.__config.clear() 273 return device_option 274 275 276class SelectionOption: 277 def __init__(self, options, label=None): 278 self.device_sn = [x for x in options["device_sn"].split(";") if x] 279 self.label = label 280 self.source_file = "" 281 self.extend_value = {} 282 self.required_manager = "" 283 self.env_index = None 284 285 def get_label(self): 286 return self.label 287 288 def get_env_index(self): 289 return self.env_index 290 291 def matches(self, device): 292 LOG.debug("Do matches, device:{state:%s, sn:%s, label:%s}, selection " 293 "option:{device sn:%s, label:%s}" % ( 294 device.device_allocation_state, 295 convert_serial(device.device_sn), 296 device.label, 297 [convert_serial(sn) if sn else "" for sn in self.device_sn], 298 self.label)) 299 if not getattr(device, "task_state", True): 300 return False 301 302 if len(self.device_sn) != 0 and device.device_sn not in self.device_sn: 303 return False 304 305 return True 306 307 308class DeviceNode(XMLNode): 309 310 def __init__(self, usb_type, label=""): 311 super().__init__() 312 self.usb_type = usb_type 313 self.label = label 314 self.get_root_node().append(self.create_node("sn")) 315 self.get_root_node().append(self.create_node("ip")) 316 self.get_root_node().append(self.create_node("port")) 317 318 def __on_root_attrib__(self, attrib_dict): 319 attrib_dict.update({"type": self.usb_type}) 320 if self.label: 321 attrib_dict.update({"label": self.label}) 322 323 def add_address(self, host, port): 324 host_ele = self.get_root_node().find("ip") 325 port_ele = self.get_root_node().find("port") 326 host_ele.text = host 327 port_ele.text = port 328 329 def add_device_sn(self, device_sn): 330 sn_ele = self.get_root_node().find("sn") 331 if sn_ele.text: 332 sn_ele.text = "{};{}".format(sn_ele.text, device_sn) 333 else: 334 sn_ele.text = device_sn 335 return self 336 337 338class DeviceSelector(Selector): 339 340 def __init__(self, _type="", label=""): 341 super().__init__(_type, label) 342 self.device_sn = "" 343 344 def __on_config__(self, config, device_dict): 345 config.update({"device_sn": self.device_sn}) 346 347 def __on_selection_option__(self, selection_option): 348 pass 349 350 def add_device_sn(self, device_sn): 351 self.device_sn = device_sn 352 return self 353 354 355class Cache: 356 def __init__(self): 357 from xdevice import Variables 358 self.cache_file = os.path.join(Variables.res_dir, "cache.dat") 359 self.expire_time = 1 # days 360 361 def check_cache_if_expire(self): 362 if os.path.exists(self.cache_file): 363 current_modify_time = os.path.getmtime(self.cache_file) 364 current_time = time.time() 365 if Cache.get_delta_days(current_modify_time, current_time) < self.expire_time: 366 setattr(sys, ConfigConst.env_pool_cache, True) 367 LOG.info("Env pool ruuning in cache mode.") 368 return 369 self.update_cache() 370 setattr(sys, ConfigConst.env_pool_cache, False) 371 LOG.info("Env pool running in normal mode.") 372 373 @staticmethod 374 def get_delta_days(t1, t2): 375 dt2 = datetime.datetime.fromtimestamp(t2) 376 dt1 = datetime.datetime.fromtimestamp(t1) 377 return (dt2 - dt1).days 378 379 def update_cache(self): 380 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY 381 with os.fdopen(os.open(self.cache_file, flags, FilePermission.mode_755), 382 "wb") as f: 383 f.write(b'123') 384