1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from inspect import signature 20 21from _core.error import ErrorMessage 22from _core.interface import IDriver 23from _core.interface import IParser 24from _core.interface import IListener 25from _core.interface import IScheduler 26from _core.interface import IDevice 27from _core.interface import ITestKit 28from _core.interface import IDeviceManager 29from _core.interface import IReporter 30 31__all__ = ["Config", "Plugin", "get_plugin", "set_plugin_params", 32 "get_all_plugins", "clear_plugin_cache"] 33 34# plugins dict 35_PLUGINS = dict() 36# plugin config name 37_DEFAULT_CONFIG_NAME = "_plugin_config_" 38 39 40class Config: 41 """ 42 The common configuration 43 """ 44 45 def __init__(self, params=None): 46 if params is None: 47 params = {} 48 self.update(params) 49 50 def __getitem__(self, item): 51 return self.__dict__.get(item) 52 53 def __setitem__(self, key, value): 54 self.__dict__[key] = value 55 56 def update(self, params): 57 self.__dict__.update(params) 58 59 def get(self, key, default=""): 60 return self.__dict__.get(key, default) 61 62 def set(self, key, value): 63 self.__dict__[key] = value 64 65 66class Plugin(object): 67 """ 68 Plugin decorator with parameters. You can decorate one class as following: 69 @Plugin("type_name"), the default plugin id is the same as TypeName. 70 @Plugin(type="type_name", id="plugin_id") 71 """ 72 SCHEDULER = "scheduler" 73 DRIVER = "driver" 74 DEVICE = "device" 75 LOG = "log" 76 PARSER = "parser" 77 LISTENER = "listener" 78 TEST_KIT = "testkit" 79 MANAGER = "manager" 80 REPORTER = "reporter" 81 82 _builtin_plugin = dict({ 83 SCHEDULER: [IScheduler], 84 DRIVER: [IDriver], 85 DEVICE: [IDevice], 86 PARSER: [IParser], 87 LISTENER: [IListener], 88 TEST_KIT: [ITestKit], 89 MANAGER: [IDeviceManager], 90 REPORTER: [IReporter] 91 }) 92 93 def __init__(self, *args, **kwargs): 94 _param_dict = dict(kwargs) 95 if len(args) == 1 and type(args[0]) == str: 96 self.plugin_type = str(args[0]) 97 self.plugin_id = str(args[0]) 98 elif "id" in _param_dict.keys() and "type" in _param_dict.keys(): 99 self.plugin_type = _param_dict.get("type") 100 self.plugin_id = _param_dict.get("id") 101 del _param_dict["type"] 102 del _param_dict["id"] 103 else: 104 raise ValueError(ErrorMessage.InterfaceImplement.Code_0102001) 105 self.params = _param_dict 106 107 def __call__(self, cls): 108 if hasattr(cls, _DEFAULT_CONFIG_NAME): 109 raise TypeError(ErrorMessage.InterfaceImplement.Code_0102002.format(_DEFAULT_CONFIG_NAME, cls.__name__)) 110 setattr(cls, _DEFAULT_CONFIG_NAME, Config(self.params)) 111 112 init_func = getattr(cls, "__init__", None) 113 if init_func and type( 114 init_func).__name__ != "wrapper_descriptor" and len( 115 signature(init_func).parameters) != 1: 116 raise TypeError(ErrorMessage.InterfaceImplement.Code_0102003.format(cls.__name__)) 117 118 if hasattr(cls, "get_plugin_config"): 119 raise TypeError(ErrorMessage.InterfaceImplement.Code_0102004.format("get_plugin_config", cls.__name__)) 120 121 def get_plugin_config(obj): 122 del obj 123 return getattr(cls, _DEFAULT_CONFIG_NAME) 124 125 setattr(cls, "get_plugin_config", get_plugin_config) 126 127 instance = cls() 128 interfaces = self._builtin_plugin.get(self.plugin_type, []) 129 for interface in interfaces: 130 if not isinstance(instance, interface): 131 raise TypeError(ErrorMessage.InterfaceImplement.Code_0102005.format(cls.__name__, interface)) 132 133 if "xdevice" in str(instance.__class__).lower(): 134 _PLUGINS.setdefault((self.plugin_type, self.plugin_id), []).append( 135 instance) 136 else: 137 _PLUGINS.setdefault((self.plugin_type, self.plugin_id), []).insert( 138 0, instance) 139 140 return cls 141 142 def get_params(self): 143 return self.params 144 145 def get_builtin_plugin(self): 146 return self._builtin_plugin 147 148 149def get_plugin(plugin_type, plugin_id=None): 150 """ 151 Get plugin instance 152 :param plugin_type: plugin type 153 :param plugin_id: plugin id 154 :return: the instance list of plugin 155 """ 156 if plugin_id is None: 157 plugins = [] 158 for key in _PLUGINS: 159 if key[0] != plugin_type: 160 continue 161 if not _PLUGINS.get(key): 162 continue 163 if key[1] == plugin_type: 164 plugins.insert(0, _PLUGINS.get(key)[0]) 165 else: 166 plugins.append(_PLUGINS.get(key)[0]) 167 return plugins 168 169 else: 170 return _PLUGINS.get((plugin_type, plugin_id), []) 171 172 173def set_plugin_params(plugin_type, plugin_id=None, **kwargs): 174 """ 175 Set plugin parameters 176 :param plugin_type: plugin type 177 :param plugin_id: plugin id 178 :param kwargs: the parameters for plugin 179 :return: 180 """ 181 if plugin_id is None: 182 plugin_id = plugin_type 183 plugins = get_plugin(plugin_type, plugin_id) 184 if len(plugins) == 0: 185 raise ValueError(ErrorMessage.InterfaceImplement.Code_0102006.format(plugin_id)) 186 for plugin in plugins: 187 params = getattr(plugin, _DEFAULT_CONFIG_NAME) 188 params.update(kwargs) 189 190 191def get_all_plugins(): 192 """ 193 Get all plugins 194 """ 195 return dict(_PLUGINS) 196 197 198def clear_plugin_cache(): 199 """ 200 Clear all cached plugins 201 """ 202 _PLUGINS.clear() 203