1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17import json 18import os 19import time 20import sys 21 22from abc import abstractmethod 23from abc import ABCMeta 24from xml.etree import ElementTree 25from xml.etree.ElementTree import Element 26 27 28from _core.interface import IFilter 29from _core.interface import IDeviceManager 30from _core.logger import platform_logger 31from _core.plugin import Plugin 32from _core.plugin import get_plugin 33from _core.utils import convert_serial 34from _core.utils import get_cst_time 35from _core.logger import change_logger_level 36from _core.constants import ConfigConst 37from _core.constants import FilePermission 38from _core.context.log import RuntimeLogs 39 40LOG = platform_logger("EnvPool") 41 42__all__ = ["EnvPool", "XMLNode", "Selector", "DeviceSelector", "DeviceNode", "is_env_pool_run_mode"] 43 44 45class EnvPool(object): 46 """ 47 Class representing environment pool that 48 managing the set of available devices for testing. 49 this class is used directly by users without going through the command flow. 50 """ 51 instance = None 52 __init_flag = False 53 report_path = None 54 resource_path = None 55 56 def __new__(cls, *args, **kwargs): 57 """ 58 Singleton instance 59 """ 60 del args, kwargs 61 if cls.instance is None: 62 cls.instance = super(EnvPool, cls).__new__(cls) 63 return cls.instance 64 65 def __init__(self, **kwargs): 66 EnvPool.report_path = kwargs.get("report_path", "") 67 self._stop_task_log() 68 self._start_task_log() 69 if EnvPool.__init_flag: 70 return 71 self._managers = {} 72 self._filters = {} 73 self._init_log_level(kwargs.get("log_level", "info")) 74 self._load_managers() 75 EnvPool.__init_flag = True 76 EnvPool.resource_path = kwargs.get("resource_path", 77 os.path.join(os.path.abspath(os.getcwd()), "resource")) 78 setattr(sys, "ecotest_resource_path", EnvPool.resource_path) 79 # init cache file and check if expire 80 cache_file = Cache() 81 cache_file.check_cache_if_expire() 82 self.devices = list() 83 84 def _load_managers(self): 85 LOG.info("Load Managers ...") 86 from xdevice import EnvironmentManager 87 if EnvironmentManager._EnvironmentManager__init_flag: 88 LOG.info("Managers has inited, using EnvironmentManager managers...") 89 self._managers = EnvironmentManager._EnvironmentManager__instance.managers 90 return 91 manager_plugins = get_plugin(Plugin.MANAGER) 92 for manager_plugin in manager_plugins: 93 try: 94 manager_instance = manager_plugin.__class__() 95 self._managers[manager_instance.__class__.__name__] = manager_instance 96 except Exception as error: 97 LOG.error("Pool start error: {}".format(error)) 98 # 倒序排列, 优先加载OH设备 99 if self._managers: 100 self._managers = dict(sorted(self._managers.items(), reverse=True)) 101 102 def _unload_manager(self): 103 for manager in self._managers.values(): 104 if manager.__class__.__name__ not in self._filters: 105 continue 106 manager.devices_list.clear() 107 manager.env_stop() 108 self._managers.clear() 109 EnvPool.__init_flag = False 110 111 def get_device(self, selector, timeout=10): 112 LOG.info("Get device by selector") 113 device = self._apply_device(selector, timeout) 114 if device is not None: 115 LOG.info("Device {}: extend value: {}".format( 116 convert_serial(device.device_sn), device.extend_value)) 117 self.devices.append(device) 118 else: 119 LOG.info("Require label is '{}', can't get device". 120 format(selector.label)) 121 return device 122 123 def init_pool(self, node): 124 LOG.info("Prepare to init pool") 125 for manager in self._managers.values(): 126 if not isinstance(manager, IFilter): 127 continue 128 if not manager.__filter_xml_node__(node): 129 continue 130 if not isinstance(manager, IDeviceManager): 131 continue 132 manager.init_environment(node.format(), "") 133 self._filters[manager.__class__.__name__] = manager 134 LOG.info("Pool is prepared") 135 if not self._filters: 136 LOG.info("Can't find any manager, may be no connector are assign" 137 "or the plugins of manager are not installed!") 138 139 def shutdown(self): 140 # clear device rpc port 141 for device in self.devices: 142 if hasattr(device, "remove_ports"): 143 device.remove_ports() 144 self._unload_manager() 145 self._stop_task_log() 146 EnvPool.instance = None 147 EnvPool.__init_flag = False 148 149 def _apply_device(self, selector, timeout=3): 150 LOG.info("Apply device in pool") 151 for manager_type, manager in self._filters.items(): 152 if not manager.__filter_selector__(selector): 153 continue 154 device_option = selector.format() 155 if not device_option: 156 continue 157 support_labels = getattr(manager, "support_labels", []) 158 support_types = getattr(manager, "support_types", []) 159 if device_option.required_manager not in support_types: 160 LOG.info("'{}' not in {}'s support types".format( 161 device_option.required_manager, manager_type)) 162 continue 163 if not support_labels: 164 continue 165 if device_option.label is None: 166 if manager_type != "ManagerDevice": 167 continue 168 else: 169 if support_labels and \ 170 device_option.label not in support_labels: 171 continue 172 device = manager.apply_device(device_option, timeout) 173 if hasattr(device, "env_index"): 174 device.env_index = device_option.get_env_index() 175 if device: 176 return device 177 else: 178 return None 179 180 @classmethod 181 def _init_log_level(cls, level): 182 if str(level).lower() not in ["debug", "info"]: 183 LOG.info("Level str must be 'debug' or 'info'") 184 return 185 change_logger_level({"console": level}) 186 187 @classmethod 188 def _start_task_log(cls): 189 report_folder_path = EnvPool.report_path 190 if not report_folder_path: 191 report_folder_path = os.path.join( 192 os.path.abspath(os.getcwd()), "reports", get_cst_time().strftime("%Y-%m-%d-%H-%M-%S")) 193 if not os.path.exists(report_folder_path): 194 os.makedirs(report_folder_path) 195 LOG.info("Report path: {}".format(report_folder_path)) 196 EnvPool.report_path = report_folder_path 197 RuntimeLogs.start_task_log(report_folder_path) 198 199 @classmethod 200 def _stop_task_log(cls): 201 RuntimeLogs.stop_task_logcat() 202 203 204class XMLNode(metaclass=ABCMeta): 205 206 @abstractmethod 207 def __init__(self): 208 self.__device_ele = Element("device") 209 self.__connectors = [] 210 211 @abstractmethod 212 def __on_root_attrib__(self, attrib_dict): 213 pass 214 215 def add_element_string(self, element_str=""): 216 if element_str: 217 device_ele = ElementTree.fromstring(element_str) 218 if device_ele.tag == "device": 219 self.__device_ele = device_ele 220 return self 221 222 @classmethod 223 def create_node(cls, tag): 224 return Element(tag) 225 226 def build_connector(self, connector_name): 227 self.__connectors.append(connector_name) 228 return self 229 230 def get_connectors(self): 231 return self.__connectors 232 233 def format(self): 234 attrib_dict = dict() 235 self.__on_root_attrib__(attrib_dict) 236 self.__device_ele.attrib = attrib_dict 237 env = self.create_node("environment") 238 env.append(self.__device_ele) 239 root = self.create_node("user_config") 240 root.append(env) 241 return ElementTree.tostring(root, encoding="utf-8") 242 243 def get_root_node(self): 244 return self.__device_ele 245 246 247class Selector(metaclass=ABCMeta): 248 249 @abstractmethod 250 def __init__(self, _type, label): 251 self.__device_dict = dict() 252 self.__config = dict() 253 self.label = label 254 self.type = _type 255 256 def add_environment_content(self, content): 257 _content = content 258 if isinstance(_content, str): 259 _content = _content.strip() 260 if _content.startswith("[") and _content.endswith("]"): 261 self.__device_dict.update(json.loads(_content)[0]) 262 elif _content.startswith("{") and _content.endswith("}"): 263 self.__device_dict.update(json.loads(content)) 264 else: 265 raise RuntimeError("Invalid str input! ['{}']".format(_content)) 266 elif isinstance(_content, list): 267 self.__device_dict.update(_content[0]) 268 elif isinstance(_content, dict): 269 self.__device_dict.update(_content) 270 return self 271 272 @abstractmethod 273 def __on_config__(self, config, device_dict): 274 pass 275 276 @abstractmethod 277 def __on_selection_option__(self, selection_option): 278 pass 279 280 def add_label(self, label): 281 self.label = label 282 return self 283 284 def add_type(self, _type): 285 self.type = _type 286 return self 287 288 def format(self): 289 if self.type or self.label: 290 self.__device_dict.update({"type": self.type}) 291 self.__device_dict.update({"label": self.label}) 292 self.__on_config__(self.__config, self.__device_dict) 293 index = 1 294 label = self.__device_dict.get("label", "phone") 295 required_manager = self.__device_dict.get("type", "device") 296 device_option = SelectionOption(self.__config, label) 297 self.__device_dict.pop("type", None) 298 self.__device_dict.pop("label", None) 299 device_option.required_manager = required_manager 300 device_option.extend_value = self.__device_dict 301 if hasattr(device_option, "env_index"): 302 device_option.env_index = index 303 index += 1 304 self.__on_selection_option__(device_option) 305 self.__device_dict.clear() 306 self.__config.clear() 307 return device_option 308 309 310class SelectionOption: 311 def __init__(self, options, label=None): 312 self.device_sn = [x for x in options["device_sn"].split(";") if x] 313 self.label = label 314 self.source_file = "" 315 self.extend_value = {} 316 self.required_manager = "" 317 self.env_index = None 318 319 def get_label(self): 320 return self.label 321 322 def get_env_index(self): 323 return self.env_index 324 325 def matches(self, device): 326 LOG.info("Do matches, device:[state:{}, sn:{}, label:{}], selection " 327 "option:[device sn:{}, label:{}]".format( 328 device.device_allocation_state, 329 convert_serial(device.device_sn), 330 device.label, 331 [convert_serial(sn) if sn else "" for sn in self.device_sn], 332 self.label)) 333 if not getattr(device, "task_state", True): 334 return False 335 336 if len(self.device_sn) != 0 and device.device_sn not in self.device_sn: 337 return False 338 339 return True 340 341 342class DeviceNode(XMLNode): 343 344 def __init__(self, usb_type, label=""): 345 super().__init__() 346 self.usb_type = usb_type 347 self.label = label 348 self.get_root_node().append(self.create_node("sn")) 349 self.get_root_node().append(self.create_node("ip")) 350 self.get_root_node().append(self.create_node("port")) 351 352 def __on_root_attrib__(self, attrib_dict): 353 attrib_dict.update({"type": self.usb_type}) 354 if self.label: 355 attrib_dict.update({"label": self.label}) 356 357 def add_address(self, host, port): 358 host_ele = self.get_root_node().find("ip") 359 port_ele = self.get_root_node().find("port") 360 host_ele.text = host 361 port_ele.text = port 362 return self 363 364 def add_device_sn(self, device_sn): 365 sn_ele = self.get_root_node().find("sn") 366 if sn_ele.text: 367 sn_ele.text = "{};{}".format(sn_ele.text, device_sn) 368 else: 369 sn_ele.text = device_sn 370 return self 371 372 373class DeviceSelector(Selector): 374 375 def __init__(self, _type="", label=""): 376 super().__init__(_type, label) 377 self.device_sn = "" 378 379 def __on_config__(self, config, device_dict): 380 config.update({"device_sn": self.device_sn}) 381 382 def __on_selection_option__(self, selection_option): 383 pass 384 385 def add_device_sn(self, device_sn): 386 self.device_sn = device_sn 387 return self 388 389 390class Cache: 391 def __init__(self): 392 from xdevice import Variables 393 self.cache_file = os.path.join(Variables.temp_dir, "cache.dat") 394 self.expire_time = 1 # days 395 396 def check_cache_if_expire(self): 397 if os.path.exists(self.cache_file): 398 current_modify_time = os.path.getmtime(self.cache_file) 399 current_time = time.time() 400 if Cache.get_delta_days(current_modify_time, current_time) < self.expire_time: 401 setattr(sys, ConfigConst.env_pool_cache, True) 402 LOG.info("Env pool running in cache mode.") 403 return 404 self.update_cache() 405 setattr(sys, ConfigConst.env_pool_cache, False) 406 LOG.info("Env pool running in normal mode.") 407 408 @staticmethod 409 def get_delta_days(t1, t2): 410 import datetime 411 dt2 = datetime.datetime.fromtimestamp(t2) 412 dt1 = datetime.datetime.fromtimestamp(t1) 413 return (dt2 - dt1).days 414 415 def update_cache(self): 416 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND 417 with os.fdopen(os.open(self.cache_file, flags, FilePermission.mode_755), 418 "wb") as f: 419 f.write(b'123') # 写入字节数据 420 421 422def is_env_pool_run_mode(): 423 return False if EnvPool.instance is None else True 424