1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17import json 18import os 19import time 20import sys 21 22from abc import abstractmethod 23from abc import ABCMeta 24from xml.etree import ElementTree 25from xml.etree.ElementTree import Element 26 27 28from _core.interface import IFilter 29from _core.interface import IDeviceManager 30from _core.logger import platform_logger 31from _core.plugin import Plugin 32from _core.plugin import get_plugin 33from _core.utils import convert_serial 34from _core.utils import get_cst_time 35from _core.logger import change_logger_level 36from _core.constants import ConfigConst 37from _core.constants import FilePermission 38from _core.executor.scheduler import Scheduler 39 40LOG = platform_logger("EnvPool") 41 42__all__ = ["EnvPool", "XMLNode", "Selector", "DeviceSelector", "DeviceNode", "is_env_pool_run_mode"] 43 44 45class EnvPool(object): 46 """ 47 Class representing environment pool that 48 managing the set of available devices for testing. 49 this class is used directly by users without going through the command flow. 50 """ 51 instance = None 52 __init_flag = False 53 report_path = None 54 resource_path = None 55 56 def __new__(cls, *args, **kwargs): 57 """ 58 Singleton instance 59 """ 60 del args, kwargs 61 if cls.instance is None: 62 cls.instance = super(EnvPool, cls).__new__(cls) 63 return cls.instance 64 65 def __init__(self, **kwargs): 66 EnvPool.report_path = kwargs.get("report_path", "") 67 self._stop_task_log() 68 self._start_task_log() 69 if EnvPool.__init_flag: 70 return 71 self._managers = {} 72 self._filters = {} 73 self._init_log_level(kwargs.get("log_level", "info")) 74 self._load_managers() 75 EnvPool.__init_flag = True 76 EnvPool.resource_path = kwargs.get("resource_path", 77 os.path.join(os.path.abspath(os.getcwd()), "resource")) 78 setattr(sys, "ecotest_resource_path", EnvPool.resource_path) 79 # init cache file and check if expire 80 cache_file = Cache() 81 cache_file.check_cache_if_expire() 82 self.devices = list() 83 84 def _load_managers(self): 85 LOG.info("Load Managers ...") 86 manager_plugins = get_plugin(Plugin.MANAGER) 87 for manager_plugin in manager_plugins: 88 try: 89 manager_instance = manager_plugin.__class__() 90 self._managers[manager_instance.__class__.__name__] = \ 91 manager_instance 92 except Exception as error: 93 LOG.error("Pool start error: {}".format(error)) 94 # reverse sort 95 if self._managers: 96 self._managers = dict(sorted(self._managers.items(), reverse=True)) 97 98 def _unload_manager(self): 99 for manager in self._managers.values(): 100 if manager.__class__.__name__ not in self._filters: 101 continue 102 manager.devices_list = [] 103 self._managers = {} 104 EnvPool.__init_flag = False 105 106 def get_device(self, selector, timeout=10): 107 LOG.info("Get device by selector") 108 device = self._apply_device(selector, timeout) 109 if device is not None: 110 LOG.info("Device {}: extend value: {}".format( 111 convert_serial(device.device_sn), device.extend_value)) 112 self.devices.append(device) 113 else: 114 LOG.info("Require label is '{}', can't get device". 115 format(selector.label)) 116 return device 117 118 def init_pool(self, node): 119 LOG.info("Prepare to init pool") 120 for manager in self._managers.values(): 121 if not isinstance(manager, IFilter): 122 continue 123 if not manager.__filter_xml_node__(node): 124 continue 125 if not isinstance(manager, IDeviceManager): 126 continue 127 manager.init_environment(node.format(), "") 128 self._filters[manager.__class__.__name__] = manager 129 LOG.info("Pool is prepared") 130 if not self._filters: 131 LOG.info("Can't find any manager, may be no connector are assign" 132 "or the plugins of manager are not installed!") 133 134 def shutdown(self): 135 # clear device rpc port 136 for device in self.devices: 137 if hasattr(device, "remove_ports"): 138 device.remove_ports() 139 self._unload_manager() 140 self._stop_task_log() 141 142 def _apply_device(self, selector, timeout=3): 143 LOG.info("Apply device in pool") 144 for manager_type, manager in self._filters.items(): 145 if not manager.__filter_selector__(selector): 146 continue 147 device_option = selector.format() 148 if not device_option: 149 continue 150 support_labels = getattr(manager, "support_labels", []) 151 support_types = getattr(manager, "support_types", []) 152 if device_option.required_manager not in support_types: 153 LOG.info("'{}' not in {}'s support types".format( 154 device_option.required_manager, manager_type)) 155 continue 156 if not support_labels: 157 continue 158 if device_option.label is None: 159 if manager_type != "ManagerDevice": 160 continue 161 else: 162 if support_labels and \ 163 device_option.label not in support_labels: 164 continue 165 device = manager.apply_device(device_option, timeout) 166 if hasattr(device, "env_index"): 167 device.env_index = device_option.get_env_index() 168 if device: 169 return device 170 else: 171 return None 172 173 @classmethod 174 def _init_log_level(cls, level): 175 if str(level).lower() not in ["debug", "info"]: 176 LOG.info("Level str must be 'debug' or 'info'") 177 return 178 change_logger_level({"console": level}) 179 180 @classmethod 181 def _start_task_log(cls): 182 report_folder_path = EnvPool.report_path 183 if not report_folder_path: 184 report_folder_path = os.path.join( 185 os.path.abspath(os.getcwd()), "reports", get_cst_time().strftime("%Y-%m-%d-%H-%M-%S")) 186 if not os.path.exists(report_folder_path): 187 os.makedirs(report_folder_path) 188 LOG.info("Report path: {}".format(report_folder_path)) 189 EnvPool.report_path = report_folder_path 190 Scheduler.start_task_log(report_folder_path) 191 192 @classmethod 193 def _stop_task_log(cls): 194 Scheduler.stop_task_logcat() 195 196 197class XMLNode(metaclass=ABCMeta): 198 199 @abstractmethod 200 def __init__(self): 201 self.__device_ele = Element("device") 202 self.__connectors = [] 203 204 @abstractmethod 205 def __on_root_attrib__(self, attrib_dict): 206 pass 207 208 def add_element_string(self, element_str=""): 209 if element_str: 210 device_ele = ElementTree.fromstring(element_str) 211 if device_ele.tag == "device": 212 self.__device_ele = device_ele 213 return self 214 215 @classmethod 216 def create_node(cls, tag): 217 return Element(tag) 218 219 def build_connector(self, connector_name): 220 self.__connectors.append(connector_name) 221 return self 222 223 def get_connectors(self): 224 return self.__connectors 225 226 def format(self): 227 attrib_dict = dict() 228 self.__on_root_attrib__(attrib_dict) 229 self.__device_ele.attrib = attrib_dict 230 env = self.create_node("environment") 231 env.append(self.__device_ele) 232 root = self.create_node("user_config") 233 root.append(env) 234 return ElementTree.tostring(root, encoding="utf-8") 235 236 def get_root_node(self): 237 return self.__device_ele 238 239 240class Selector(metaclass=ABCMeta): 241 242 @abstractmethod 243 def __init__(self, _type, label): 244 self.__device_dict = dict() 245 self.__config = dict() 246 self.label = label 247 self.type = _type 248 249 def add_environment_content(self, content): 250 _content = content 251 if isinstance(_content, str): 252 _content = _content.strip() 253 if _content.startswith("[") and _content.endswith("]"): 254 self.__device_dict.update(json.loads(_content)[0]) 255 elif _content.startswith("{") and _content.endswith("}"): 256 self.__device_dict.update(json.loads(content)) 257 else: 258 raise RuntimeError("Invalid str input! ['{}']".format(_content)) 259 elif isinstance(_content, list): 260 self.__device_dict.update(_content[0]) 261 elif isinstance(_content, dict): 262 self.__device_dict.update(_content) 263 return self 264 265 @abstractmethod 266 def __on_config__(self, config, device_dict): 267 pass 268 269 @abstractmethod 270 def __on_selection_option__(self, selection_option): 271 pass 272 273 def add_label(self, label): 274 self.label = label 275 return self 276 277 def add_type(self, _type): 278 self.type = _type 279 return self 280 281 def format(self): 282 if self.type or self.label: 283 self.__device_dict.update({"type": self.type}) 284 self.__device_dict.update({"label": self.label}) 285 self.__on_config__(self.__config, self.__device_dict) 286 index = 1 287 label = self.__device_dict.get("label", "phone") 288 required_manager = self.__device_dict.get("type", "device") 289 device_option = SelectionOption(self.__config, label) 290 self.__device_dict.pop("type", None) 291 self.__device_dict.pop("label", None) 292 device_option.required_manager = required_manager 293 device_option.extend_value = self.__device_dict 294 if hasattr(device_option, "env_index"): 295 device_option.env_index = index 296 index += 1 297 self.__on_selection_option__(device_option) 298 self.__device_dict.clear() 299 self.__config.clear() 300 return device_option 301 302 303class SelectionOption: 304 def __init__(self, options, label=None): 305 self.device_sn = [x for x in options["device_sn"].split(";") if x] 306 self.label = label 307 self.source_file = "" 308 self.extend_value = {} 309 self.required_manager = "" 310 self.env_index = None 311 312 def get_label(self): 313 return self.label 314 315 def get_env_index(self): 316 return self.env_index 317 318 def matches(self, device): 319 LOG.info("Do matches, device:[state:{}, sn:{}, label:{}], selection " 320 "option:[device sn:{}, label:{}]".format( 321 device.device_allocation_state, 322 convert_serial(device.device_sn), 323 device.label, 324 [convert_serial(sn) if sn else "" for sn in self.device_sn], 325 self.label)) 326 if not getattr(device, "task_state", True): 327 return False 328 329 if len(self.device_sn) != 0 and device.device_sn not in self.device_sn: 330 return False 331 332 return True 333 334 335class DeviceNode(XMLNode): 336 337 def __init__(self, usb_type, label=""): 338 super().__init__() 339 self.usb_type = usb_type 340 self.label = label 341 self.get_root_node().append(self.create_node("sn")) 342 self.get_root_node().append(self.create_node("ip")) 343 self.get_root_node().append(self.create_node("port")) 344 345 def __on_root_attrib__(self, attrib_dict): 346 attrib_dict.update({"type": self.usb_type}) 347 if self.label: 348 attrib_dict.update({"label": self.label}) 349 350 def add_address(self, host, port): 351 host_ele = self.get_root_node().find("ip") 352 port_ele = self.get_root_node().find("port") 353 host_ele.text = host 354 port_ele.text = port 355 return self 356 357 def add_device_sn(self, device_sn): 358 sn_ele = self.get_root_node().find("sn") 359 if sn_ele.text: 360 sn_ele.text = "{};{}".format(sn_ele.text, device_sn) 361 else: 362 sn_ele.text = device_sn 363 return self 364 365 366class DeviceSelector(Selector): 367 368 def __init__(self, _type="", label=""): 369 super().__init__(_type, label) 370 self.device_sn = "" 371 372 def __on_config__(self, config, device_dict): 373 config.update({"device_sn": self.device_sn}) 374 375 def __on_selection_option__(self, selection_option): 376 pass 377 378 def add_device_sn(self, device_sn): 379 self.device_sn = device_sn 380 return self 381 382 383class Cache: 384 def __init__(self): 385 from xdevice import Variables 386 self.cache_file = os.path.join(Variables.temp_dir, "cache.dat") 387 self.expire_time = 1 # days 388 389 def check_cache_if_expire(self): 390 if os.path.exists(self.cache_file): 391 current_modify_time = os.path.getmtime(self.cache_file) 392 current_time = time.time() 393 if Cache.get_delta_days(current_modify_time, current_time) < self.expire_time: 394 setattr(sys, ConfigConst.env_pool_cache, True) 395 LOG.info("Env pool running in cache mode.") 396 return 397 self.update_cache() 398 setattr(sys, ConfigConst.env_pool_cache, False) 399 LOG.info("Env pool running in normal mode.") 400 401 @staticmethod 402 def get_delta_days(t1, t2): 403 import datetime 404 dt2 = datetime.datetime.fromtimestamp(t2) 405 dt1 = datetime.datetime.fromtimestamp(t1) 406 return (dt2 - dt1).days 407 408 def update_cache(self): 409 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND 410 with os.fdopen(os.open(self.cache_file, flags, FilePermission.mode_755), 411 "wb") as f: 412 f.write(b'123') 413 414 415def is_env_pool_run_mode(): 416 return False if EnvPool.instance is None else True