1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from dataclasses import dataclass 20 21from _core.config.config_manager import UserConfigManager 22from _core.logger import platform_logger 23from _core.logger import change_logger_level 24from _core.plugin import Plugin 25from _core.plugin import get_plugin 26from _core.utils import convert_serial 27from _core.constants import ProductForm 28from _core.constants import ConfigConst 29from _core.environment.device_state import DeviceAllocationState 30 31__all__ = ["EnvironmentManager", "DeviceSelectionOption", "Environment"] 32 33LOG = platform_logger("ManagerEnv") 34 35 36class Environment(object): 37 """ 38 Environment required for each dispatch 39 """ 40 device_mapper = { 41 ProductForm.phone: "Phone", 42 ProductForm.tablet: "Tablet", 43 ProductForm.car: "Car", 44 ProductForm.television: "Tv", 45 ProductForm.watch: "Watch", 46 } 47 48 def __init__(self): 49 self.devices = [] 50 self.phone = 0 51 self.wifiiot = 0 52 self.ipcamera = 0 53 self.device_recorder = dict() 54 55 def __get_serial__(self): 56 device_serials = [] 57 for device in self.devices: 58 device_serials.append(convert_serial(device.__get_serial__())) 59 return ";".join(device_serials) 60 61 def get_devices(self): 62 return self.devices 63 64 def get_description(self): 65 descriptions = [] 66 for d in self.devices: 67 try: 68 descriptions.append(d.device_description) 69 except Exception as e: 70 LOG.error(f"get device description error: {e}") 71 return descriptions 72 73 def check_serial(self): 74 if self.__get_serial__(): 75 return True 76 return False 77 78 def add_device(self, device, index=None): 79 label = self.device_mapper.get(device.label, "DUT") 80 if index: 81 current = index 82 else: 83 current = self.device_recorder.get(label, 0) + 1 84 device.device_id = "%s%s" % (label, current) 85 LOG.debug("add_device, sn: {}, id: {}".format(device.device_sn, device.device_id)) 86 self.device_recorder.update({label: current}) 87 self.devices.append(device) 88 89 90class EnvironmentManager(object): 91 """ 92 Class representing environment manager that 93 managing the set of available devices for testing 94 """ 95 __instance = None 96 __init_flag = False 97 98 def __new__(cls, *args, **kwargs): 99 """ 100 Singleton instance 101 """ 102 del args, kwargs 103 if cls.__instance is None: 104 cls.__instance = super(EnvironmentManager, cls).__new__(cls) 105 return cls.__instance 106 107 def __init__(self, environment="", user_config_file=""): 108 if EnvironmentManager.__init_flag: 109 return 110 self.managers = {} 111 self.env_start(environment, user_config_file) 112 EnvironmentManager.__init_flag = True 113 114 def env_start(self, environment="", user_config_file=""): 115 116 log_level_dict = UserConfigManager( 117 config_file=user_config_file, env=environment).get_log_level() 118 if log_level_dict: 119 # change log level when load or reset EnvironmentManager object 120 change_logger_level(log_level_dict) 121 122 manager_plugins = get_plugin(Plugin.MANAGER) 123 for manager_plugin in manager_plugins: 124 try: 125 manager_instance = manager_plugin.__class__() 126 manager_instance.init_environment(environment, 127 user_config_file) 128 self.managers[manager_instance.__class__.__name__] = \ 129 manager_instance 130 except Exception as error: 131 LOG.debug("Env start error: %s" % error) 132 if len(self.managers): 133 self.managers = dict(sorted(self.managers.items(), reverse=True)) 134 135 def env_stop(self): 136 for manager in self.managers.values(): 137 manager.env_stop() 138 manager.devices_list = [] 139 self.managers = {} 140 141 EnvironmentManager.__init_flag = False 142 143 def apply_environment(self, device_options): 144 environment = Environment() 145 for device_option in device_options: 146 LOG.debug("Visit options to find device") 147 device = self.apply_device(device_option) 148 if device is not None: 149 index = self.get_config_device_index(device) 150 environment.add_device(device, index) 151 device.extend_value = device_option.extend_value 152 LOG.debug("Device %s: extend value: %s", convert_serial( 153 device.device_sn), device.extend_value) 154 if hasattr(device, "extend_device_props"): 155 device.extend_device_props() 156 device.init_description() 157 else: 158 LOG.debug("Require label is '%s', then next" % 159 device_option.label) 160 return environment 161 162 def release_environment(self, environment): 163 for device in environment.devices: 164 device.extend_value = {} 165 self.release_device(device) 166 167 def reset_environment(self, used_devices): 168 for _, device in used_devices.items(): 169 self.reset_device(device) 170 171 def apply_device(self, device_option, timeout=3): 172 LOG.debug("Apply device from managers:%s" % self.managers) 173 for manager_type, manager in self.managers.items(): 174 support_labels = getattr(manager, "support_labels", []) 175 support_types = getattr(manager, "support_types", []) 176 if device_option.required_manager not in support_types: 177 LOG.warning("'%s' not in %s's support types" % ( 178 device_option.required_manager, manager_type)) 179 continue 180 if not support_labels: 181 continue 182 if device_option.label is None: 183 if manager_type != "ManagerDevice": 184 continue 185 else: 186 if support_labels and \ 187 device_option.label not in support_labels: 188 continue 189 device = manager.apply_device(device_option, timeout) 190 if hasattr(device, "env_index"): 191 device.env_index = device_option.get_env_index() 192 if device: 193 return device 194 else: 195 return None 196 197 def get_config_device_index(self, device): 198 if device and hasattr(device, "device_sn"): 199 sn = device.device_sn 200 for manager in self.managers.items(): 201 if hasattr(manager[1], "global_device_filter"): 202 index = 1 203 for s in manager[1].global_device_filter: 204 if s == sn: 205 return index 206 else: 207 index += 1 208 return None 209 210 def check_device_exist(self, device_options): 211 """ 212 Check if there are matched devices which can be allocated or available. 213 """ 214 devices = [] 215 for device_option in device_options: 216 for manager_type, manager in self.managers.items(): 217 support_labels = getattr(manager, "support_labels", []) 218 support_types = getattr(manager, "support_types", []) 219 if device_option.required_manager not in support_types: 220 continue 221 if device_option.label is None: 222 if manager_type != "ManagerDevice": 223 continue 224 else: 225 if support_labels and \ 226 device_option.label not in support_labels: 227 continue 228 for device in manager.devices_list: 229 if device.device_sn in devices: 230 continue 231 if device_option.matches(device, False): 232 devices.append(device.device_sn) 233 break 234 else: 235 continue 236 break 237 else: 238 return False 239 return True 240 241 def release_device(self, device): 242 for manager in self.managers.values(): 243 if device in manager.devices_list: 244 manager.release_device(device) 245 246 def reset_device(self, device): 247 for manager in self.managers.values(): 248 if device in manager.devices_list: 249 manager.reset_device(device) 250 251 def list_devices(self): 252 LOG.info("List devices.") 253 for manager in self.managers.values(): 254 manager.list_devices() 255 256 257class DeviceSelectionOption(object): 258 """ 259 Class representing device selection option 260 """ 261 262 def __init__(self, options, label=None, test_source=None): 263 self.device_sn = [x for x in options["device_sn"].split(";") if x] 264 self.label = label 265 self.test_driver = test_source.test_type 266 self.source_file = "" 267 self.extend_value = {} 268 self.required_manager = "" 269 self.required_component = "" 270 self.env_index = None 271 272 def get_label(self): 273 return self.label 274 275 def get_env_index(self): 276 return self.env_index 277 278 def matches(self, device, allocate=True): 279 LOG.debug("Do matches, device:{state:%s, sn:%s, label:%s}, selection " 280 "option:{device sn:%s, label:%s}" % ( 281 device.device_allocation_state, 282 convert_serial(device.device_sn), 283 device.label, 284 [convert_serial(sn) if sn else "" for sn in self.device_sn], 285 self.label)) 286 if not getattr(device, "task_state", True): 287 return False 288 if allocate and device.device_allocation_state != \ 289 DeviceAllocationState.available: 290 return False 291 292 if not allocate: 293 if device.device_allocation_state != \ 294 DeviceAllocationState.available and \ 295 device.device_allocation_state != \ 296 DeviceAllocationState.allocated: 297 return False 298 299 if len(self.device_sn) != 0 and device.device_sn not in self.device_sn: 300 return False 301 302 if self.label and self.label != device.label: 303 return False 304 if self.required_component and \ 305 hasattr(device, ConfigConst.support_component): 306 subsystems, parts = getattr(device, ConfigConst.support_component) 307 required_subsystems, require_part = self.required_component 308 if required_subsystems not in subsystems and \ 309 require_part not in parts: 310 return False 311 return True 312