1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from dataclasses import dataclass 20 21from _core.config.config_manager import UserConfigManager 22from _core.logger import platform_logger 23from _core.logger import change_logger_level 24from _core.plugin import Plugin 25from _core.plugin import get_plugin 26from _core.utils import convert_serial 27from _core.constants import ProductForm 28from _core.constants import ConfigConst 29from _core.environment.device_state import DeviceAllocationState 30 31__all__ = ["EnvironmentManager", "DeviceSelectionOption", "Environment"] 32 33LOG = platform_logger("ManagerEnv") 34 35 36class Environment(object): 37 """ 38 Environment required for each dispatch 39 """ 40 device_mapper = { 41 ProductForm.phone: "Phone", 42 ProductForm.tablet: "Tablet", 43 ProductForm.car: "Car", 44 ProductForm.television: "Tv", 45 ProductForm.watch: "Watch", 46 } 47 48 def __init__(self): 49 self.devices = [] 50 self.phone = 0 51 self.wifiiot = 0 52 self.ipcamera = 0 53 self.device_recorder = dict() 54 55 def __get_serial__(self): 56 device_serials = [] 57 for device in self.devices: 58 device_serials.append(convert_serial(device.__get_serial__())) 59 return ";".join(device_serials) 60 61 def get_devices(self): 62 return self.devices 63 64 def check_serial(self): 65 if self.__get_serial__(): 66 return True 67 return False 68 69 def add_device(self, device, index=None): 70 label = self.device_mapper.get(device.label, "DUT") 71 if index: 72 current = index 73 else: 74 current = self.device_recorder.get(label, 0) + 1 75 device.device_id = "%s%s" % (label, current) 76 LOG.debug("add_device, sn: {}, id: {}".format(device.device_sn, device.device_id)) 77 self.device_recorder.update({label: current}) 78 self.devices.append(device) 79 80 81class EnvironmentManager(object): 82 """ 83 Class representing environment manager that 84 managing the set of available devices for testing 85 """ 86 __instance = None 87 __init_flag = False 88 89 def __new__(cls, *args, **kwargs): 90 """ 91 Singleton instance 92 """ 93 del args, kwargs 94 if cls.__instance is None: 95 cls.__instance = super(EnvironmentManager, cls).__new__(cls) 96 return cls.__instance 97 98 def __init__(self, environment="", user_config_file=""): 99 if EnvironmentManager.__init_flag: 100 return 101 self.managers = {} 102 self.env_start(environment, user_config_file) 103 EnvironmentManager.__init_flag = True 104 105 def env_start(self, environment="", user_config_file=""): 106 107 log_level_dict = UserConfigManager( 108 config_file=user_config_file, env=environment).get_log_level() 109 if log_level_dict: 110 # change log level when load or reset EnvironmentManager object 111 change_logger_level(log_level_dict) 112 113 manager_plugins = get_plugin(Plugin.MANAGER) 114 for manager_plugin in manager_plugins: 115 try: 116 manager_instance = manager_plugin.__class__() 117 manager_instance.init_environment(environment, 118 user_config_file) 119 self.managers[manager_instance.__class__.__name__] = \ 120 manager_instance 121 except Exception as error: 122 LOG.debug("Env start error: %s" % error) 123 if len(self.managers): 124 self.managers = dict(sorted(self.managers.items(), reverse=True)) 125 126 def env_stop(self): 127 for manager in self.managers.values(): 128 manager.env_stop() 129 manager.devices_list = [] 130 self.managers = {} 131 132 EnvironmentManager.__init_flag = False 133 134 def apply_environment(self, device_options): 135 environment = Environment() 136 for device_option in device_options: 137 LOG.debug("Visit options to find device") 138 device = self.apply_device(device_option) 139 if device is not None: 140 index = self.get_config_device_index(device) 141 environment.add_device(device, index) 142 device.extend_value = device_option.extend_value 143 LOG.debug("Device %s: extend value: %s", convert_serial( 144 device.device_sn), device.extend_value) 145 else: 146 LOG.debug("Require label is '%s', then next" % 147 device_option.label) 148 return environment 149 150 def release_environment(self, environment): 151 for device in environment.devices: 152 device.extend_value = {} 153 self.release_device(device) 154 155 def reset_environment(self, used_devices): 156 for _, device in used_devices.items(): 157 self.reset_device(device) 158 159 def apply_device(self, device_option, timeout=10): 160 LOG.debug("Apply device from managers:%s" % self.managers) 161 for manager_type, manager in self.managers.items(): 162 support_labels = getattr(manager, "support_labels", []) 163 support_types = getattr(manager, "support_types", []) 164 if device_option.required_manager not in support_types: 165 LOG.warning("'%s' not in %s's support types" % ( 166 device_option.required_manager, manager_type)) 167 continue 168 if not support_labels: 169 continue 170 if device_option.label is None: 171 if manager_type != "ManagerDevice" and \ 172 manager_type != "ManagerAospDevice": 173 continue 174 else: 175 if support_labels and \ 176 device_option.label not in support_labels: 177 continue 178 device = manager.apply_device(device_option, timeout) 179 if hasattr(device, "env_index"): 180 device.env_index = device_option.get_env_index() 181 if device: 182 return device 183 else: 184 return None 185 186 def get_config_device_index(self, device): 187 if device and hasattr(device, "device_sn"): 188 sn = device.device_sn 189 for manager in self.managers.items(): 190 if hasattr(manager[1], "global_device_filter"): 191 index = 1 192 for s in manager[1].global_device_filter: 193 if s == sn: 194 return index 195 else: 196 index += 1 197 return None 198 199 def check_device_exist(self, device_options): 200 """ 201 Check if there are matched devices which can be allocated or available. 202 """ 203 devices = [] 204 for device_option in device_options: 205 for manager_type, manager in self.managers.items(): 206 support_labels = getattr(manager, "support_labels", []) 207 support_types = getattr(manager, "support_types", []) 208 if device_option.required_manager not in support_types: 209 continue 210 if device_option.label is None: 211 if manager_type != "ManagerDevice" and \ 212 manager_type != "ManagerAospDevice": 213 continue 214 else: 215 if support_labels and \ 216 device_option.label not in support_labels: 217 continue 218 for device in manager.devices_list: 219 if device.device_sn in devices: 220 continue 221 if device_option.matches(device, False): 222 devices.append(device.device_sn) 223 break 224 else: 225 continue 226 break 227 else: 228 return False 229 return True 230 231 def release_device(self, device): 232 for manager in self.managers.values(): 233 if device in manager.devices_list: 234 manager.release_device(device) 235 236 def reset_device(self, device): 237 for manager in self.managers.values(): 238 if device in manager.devices_list: 239 manager.reset_device(device) 240 241 def list_devices(self): 242 LOG.info("List devices.") 243 for manager in self.managers.values(): 244 manager.list_devices() 245 246 247class DeviceSelectionOption(object): 248 """ 249 Class representing device selection option 250 """ 251 252 def __init__(self, options, label=None, test_source=None): 253 self.device_sn = [x for x in options["device_sn"].split(";") if x] 254 self.label = label 255 self.test_driver = test_source.test_type 256 self.source_file = "" 257 self.extend_value = {} 258 self.required_manager = "" 259 self.required_component = "" 260 self.env_index = None 261 262 def get_label(self): 263 return self.label 264 265 def get_env_index(self): 266 return self.env_index 267 268 def matches(self, device, allocate=True): 269 LOG.debug("Do matches, device:{state:%s, sn:%s, label:%s}, selection " 270 "option:{device sn:%s, label:%s}" % ( 271 device.device_allocation_state, 272 convert_serial(device.device_sn), 273 device.label, 274 [convert_serial(sn) if sn else "" for sn in self.device_sn], 275 self.label)) 276 if not getattr(device, "task_state", True): 277 return False 278 if allocate and device.device_allocation_state != \ 279 DeviceAllocationState.available: 280 return False 281 282 if not allocate: 283 if device.device_allocation_state != \ 284 DeviceAllocationState.available and \ 285 device.device_allocation_state != \ 286 DeviceAllocationState.allocated: 287 return False 288 289 if len(self.device_sn) != 0 and device.device_sn not in self.device_sn: 290 return False 291 292 if self.label and self.label != device.label: 293 return False 294 if self.required_component and \ 295 hasattr(device, ConfigConst.support_component): 296 subsystems, parts = getattr(device, ConfigConst.support_component) 297 required_subsystems, require_part = self.required_component 298 if required_subsystems not in subsystems and \ 299 require_part not in parts: 300 return False 301 return True 302