1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from inspect import signature 20 21from _core.interface import IDriver 22from _core.interface import IParser 23from _core.interface import IListener 24from _core.interface import IScheduler 25from _core.interface import IDevice 26from _core.interface import ITestKit 27from _core.interface import IDeviceManager 28from _core.interface import IReporter 29 30__all__ = ["Config", "Plugin", "get_plugin", "set_plugin_params", 31 "get_all_plugins", "clear_plugin_cache"] 32 33# plugins dict 34_PLUGINS = dict() 35# plugin config name 36_DEFAULT_CONFIG_NAME = "_plugin_config_" 37 38 39class Config: 40 """ 41 The common configuration 42 """ 43 44 def __init__(self, params=None): 45 if params is None: 46 params = {} 47 self.update(params) 48 49 def __getitem__(self, item): 50 return self.__dict__.get(item) 51 52 def __setitem__(self, key, value): 53 self.__dict__[key] = value 54 55 def update(self, params): 56 self.__dict__.update(params) 57 58 def get(self, key, default=""): 59 return self.__dict__.get(key, default) 60 61 def set(self, key, value): 62 self.__dict__[key] = value 63 64 65class Plugin(object): 66 """ 67 Plugin decorator with parameters. You can decorate one class as following: 68 @Plugin("type_name"), the default plugin id is the same as TypeName. 69 @Plugin(type="type_name", id="plugin_id") 70 """ 71 SCHEDULER = "scheduler" 72 DRIVER = "driver" 73 DEVICE = "device" 74 LOG = "log" 75 PARSER = "parser" 76 LISTENER = "listener" 77 TEST_KIT = "testkit" 78 MANAGER = "manager" 79 REPORTER = "reporter" 80 81 _builtin_plugin = dict({ 82 SCHEDULER: [IScheduler], 83 DRIVER: [IDriver], 84 DEVICE: [IDevice], 85 PARSER: [IParser], 86 LISTENER: [IListener], 87 TEST_KIT: [ITestKit], 88 MANAGER: [IDeviceManager], 89 REPORTER: [IReporter] 90 }) 91 92 def __init__(self, *args, **kwargs): 93 _param_dict = dict(kwargs) 94 if len(args) == 1 and type(args[0]) == str: 95 self.plugin_type = str(args[0]) 96 self.plugin_id = str(args[0]) 97 elif "id" in _param_dict.keys() and "type" in _param_dict.keys(): 98 self.plugin_type = _param_dict.get("type") 99 self.plugin_id = _param_dict.get("id") 100 del _param_dict["type"] 101 del _param_dict["id"] 102 else: 103 raise ValueError( 104 '@Plugin must be specify type and id attributes. such as ' 105 '@Plugin("plugin_type") or ' 106 '@Plugin(type="plugin_type", id="plugin_id")') 107 self.params = _param_dict 108 109 def __call__(self, cls): 110 if hasattr(cls, _DEFAULT_CONFIG_NAME): 111 raise TypeError( 112 "'{}' attribute is not allowed for plugin {} .".format( 113 _DEFAULT_CONFIG_NAME, cls.__name__)) 114 setattr(cls, _DEFAULT_CONFIG_NAME, Config(self.params)) 115 116 init_func = getattr(cls, "__init__", None) 117 if init_func and type( 118 init_func).__name__ != "wrapper_descriptor" and len( 119 signature(init_func).parameters) != 1: 120 raise TypeError( 121 "__init__ method must be no arguments for plugin {} .".format( 122 cls.__name__)) 123 124 if hasattr(cls, "get_plugin_config"): 125 raise TypeError( 126 "'{}' method is not allowed for plugin {} .".format( 127 "get_plugin_config", cls.__name__)) 128 129 def get_plugin_config(obj): 130 del obj 131 return getattr(cls, _DEFAULT_CONFIG_NAME) 132 133 setattr(cls, "get_plugin_config", get_plugin_config) 134 135 instance = cls() 136 interfaces = self._builtin_plugin.get(self.plugin_type, []) 137 for interface in interfaces: 138 if not isinstance(instance, interface): 139 raise TypeError( 140 "{} plugin must implement {} interface.".format( 141 cls.__name__, interface)) 142 143 if "xdevice" in str(instance.__class__).lower(): 144 _PLUGINS.setdefault((self.plugin_type, self.plugin_id), []).append( 145 instance) 146 else: 147 _PLUGINS.setdefault((self.plugin_type, self.plugin_id), []).insert( 148 0, instance) 149 150 return cls 151 152 def get_params(self): 153 return self.params 154 155 def get_builtin_plugin(self): 156 return self._builtin_plugin 157 158 159def get_plugin(plugin_type, plugin_id=None): 160 """ 161 Get plugin instance 162 :param plugin_type: plugin type 163 :param plugin_id: plugin id 164 :return: the instance list of plugin 165 """ 166 if plugin_id is None: 167 plugins = [] 168 for key in _PLUGINS: 169 if key[0] != plugin_type: 170 continue 171 if not _PLUGINS.get(key): 172 continue 173 if key[1] == plugin_type: 174 plugins.insert(0, _PLUGINS.get(key)[0]) 175 else: 176 plugins.append(_PLUGINS.get(key)[0]) 177 return plugins 178 179 else: 180 return _PLUGINS.get((plugin_type, plugin_id), []) 181 182 183def set_plugin_params(plugin_type, plugin_id=None, **kwargs): 184 """ 185 Set plugin parameters 186 :param plugin_type: plugin type 187 :param plugin_id: plugin id 188 :param kwargs: the parameters for plugin 189 :return: 190 """ 191 if plugin_id is None: 192 plugin_id = plugin_type 193 plugins = get_plugin(plugin_type, plugin_id) 194 if len(plugins) == 0: 195 raise ValueError("Can not find the plugin %s" % plugin_id) 196 for plugin in plugins: 197 params = getattr(plugin, _DEFAULT_CONFIG_NAME) 198 params.update(kwargs) 199 200 201def get_all_plugins(): 202 """ 203 Get all plugins 204 """ 205 return dict(_PLUGINS) 206 207 208def clear_plugin_cache(): 209 """ 210 Clear all cached plugins 211 """ 212 _PLUGINS.clear() 213