• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from inspect import signature
20
21from _core.interface import IDriver
22from _core.interface import IParser
23from _core.interface import IListener
24from _core.interface import IScheduler
25from _core.interface import IDevice
26from _core.interface import ITestKit
27from _core.interface import IDeviceManager
28from _core.interface import IReporter
29
30__all__ = ["Config", "Plugin", "get_plugin", "set_plugin_params",
31           "get_all_plugins", "clear_plugin_cache"]
32
33# plugins dict
34_PLUGINS = dict()
35# plugin config name
36_DEFAULT_CONFIG_NAME = "_plugin_config_"
37
38
39class Config:
40    """
41    The common configuration
42    """
43
44    def __init__(self, params=None):
45        if params is None:
46            params = {}
47        self.update(params)
48
49    def __getitem__(self, item):
50        return self.__dict__[item]
51
52    def __setitem__(self, key, value):
53        self.__dict__[key] = value
54
55    def update(self, params):
56        self.__dict__.update(params)
57
58    def get(self, key, default=""):
59        return self.__dict__.get(key, default)
60
61    def set(self, key, value):
62        self.__dict__[key] = value
63
64
65class Plugin(object):
66    """
67    Plugin decorator with parameters. You can decorate one class as following:
68    @Plugin("type_name"), the default plugin id is the same as TypeName.
69    @Plugin(type="type_name", id="plugin_id")
70    """
71    SCHEDULER = "scheduler"
72    DRIVER = "driver"
73    DEVICE = "device"
74    LOG = "log"
75    PARSER = "parser"
76    LISTENER = "listener"
77    TEST_KIT = "testkit"
78    MANAGER = "manager"
79    REPORTER = "reporter"
80
81    _builtin_plugin = dict({
82        SCHEDULER: [IScheduler],
83        DRIVER: [IDriver],
84        DEVICE: [IDevice],
85        PARSER: [IParser],
86        LISTENER: [IListener],
87        TEST_KIT: [ITestKit],
88        MANAGER: [IDeviceManager],
89        REPORTER: [IReporter]
90    })
91
92    def __init__(self, *args, **kwargs):
93        _param_dict = dict(kwargs)
94        if len(args) == 1 and type(args[0]) == str:
95            self.plugin_type = str(args[0])
96            self.plugin_id = str(args[0])
97        elif "id" in _param_dict.keys() and "type" in _param_dict.keys():
98            self.plugin_type = _param_dict["type"]
99            self.plugin_id = _param_dict["id"]
100            del _param_dict["type"]
101            del _param_dict["id"]
102        else:
103            raise ValueError(
104                '@Plugin must be specify type and id attributes. such as '
105                '@Plugin("plugin_type") or '
106                '@Plugin(type="plugin_type", id="plugin_id")')
107        self.params = _param_dict
108
109    def __call__(self, cls):
110        if hasattr(cls, _DEFAULT_CONFIG_NAME):
111            raise TypeError(
112                "'{}' attribute is not allowed for plugin {} .".format(
113                    _DEFAULT_CONFIG_NAME, cls.__name__))
114        setattr(cls, _DEFAULT_CONFIG_NAME, Config(self.params))
115
116        init_func = getattr(cls, "__init__", None)
117        if init_func and type(
118                init_func).__name__ != "wrapper_descriptor" and len(
119                signature(init_func).parameters) != 1:
120            raise TypeError(
121                "__init__ method must be no arguments for plugin {} .".format(
122                    cls.__name__))
123
124        if hasattr(cls, "get_plugin_config"):
125            raise TypeError(
126                "'{}' method is not allowed for plugin {} .".format(
127                    "get_plugin_config", cls.__name__))
128
129        def get_plugin_config(obj):
130            del obj
131            return getattr(cls, _DEFAULT_CONFIG_NAME)
132
133        setattr(cls, "get_plugin_config", get_plugin_config)
134
135        instance = cls()
136        interfaces = self._builtin_plugin.get(self.plugin_type, [])
137        for interface in interfaces:
138            if not isinstance(instance, interface):
139                raise TypeError(
140                    "{} plugin must implement {} interface.".format(
141                        cls.__name__, interface))
142
143        if "xdevice" in str(instance.__class__).lower():
144            _PLUGINS.setdefault((self.plugin_type, self.plugin_id), []).append(
145                instance)
146        else:
147            _PLUGINS.setdefault((self.plugin_type, self.plugin_id), []).insert(
148                0, instance)
149
150        return cls
151
152    def get_params(self):
153        return self.params
154
155    def get_builtin_plugin(self):
156        return self._builtin_plugin
157
158
159def get_plugin(plugin_type, plugin_id=None):
160    """
161    Get plugin instance
162    :param plugin_type: plugin type
163    :param plugin_id: plugin id
164    :return:  the instance list of plugin
165    """
166    if plugin_id is None:
167        plugins = []
168        for key in _PLUGINS:
169            if key[0] != plugin_type:
170                continue
171            if not _PLUGINS.get(key):
172                continue
173            if key[1] == plugin_type:
174                plugins.insert(0, _PLUGINS.get(key)[0])
175            else:
176                plugins.append(_PLUGINS.get(key)[0])
177        return plugins
178
179    else:
180        return _PLUGINS.get((plugin_type, plugin_id), [])
181
182
183def set_plugin_params(plugin_type, plugin_id=None, **kwargs):
184    """
185    Set plugin parameters
186    :param plugin_type: plugin type
187    :param plugin_id: plugin id
188    :param kwargs: the parameters for plugin
189    :return:
190    """
191    if plugin_id is None:
192        plugin_id = plugin_type
193    plugins = get_plugin(plugin_type, plugin_id)
194    if len(plugins) == 0:
195        raise ValueError("Can not find the plugin %s" % plugin_id)
196    for plugin in plugins:
197        params = getattr(plugin, _DEFAULT_CONFIG_NAME)
198        params.update(kwargs)
199
200
201def get_all_plugins():
202    """
203    Get all plugins
204    """
205    return dict(_PLUGINS)
206
207
208def clear_plugin_cache():
209    """
210    Clear all cached plugins
211    """
212    _PLUGINS.clear()
213