• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from dataclasses import dataclass
20
21from _core.config.config_manager import UserConfigManager
22from _core.logger import platform_logger
23from _core.logger import change_logger_level
24from _core.plugin import Plugin
25from _core.plugin import get_plugin
26from _core.utils import convert_serial
27from _core.constants import ConfigConst
28
29__all__ = ["EnvironmentManager", "DeviceSelectionOption",
30           "DeviceAllocationState", "Environment"]
31
32LOG = platform_logger("ManagerEnv")
33
34
35class Environment(object):
36    """
37    Environment required for each dispatch
38    """
39
40    def __init__(self):
41        self.devices = []
42        self.phone = 0
43        self.wifiiot = 0
44        self.ipcamera = 0
45
46    def __get_serial__(self):
47        device_serials = []
48        for device in self.devices:
49            device_serials.append(convert_serial(device.__get_serial__()))
50        return ";".join(device_serials)
51
52    def get_devices(self):
53        return self.devices
54
55    def check_serial(self):
56        if self.__get_serial__():
57            return True
58        return False
59
60    def add_device(self, device):
61        if device.label == "phone":
62            self.phone += 1
63            device.device_id = "Phone%s" % str(self.phone)
64        self.devices.append(device)
65
66
67class EnvironmentManager(object):
68    """
69    Class representing environment manager
70    managing the set of available devices for testing
71    """
72    __instance = None
73    __init_flag = False
74
75    def __new__(cls, *args, **kwargs):
76        """
77        Singleton instance
78        """
79        del args, kwargs
80        if cls.__instance is None:
81            cls.__instance = super(EnvironmentManager, cls).__new__(cls)
82        return cls.__instance
83
84    def __init__(self, environment="", user_config_file=""):
85        if EnvironmentManager.__init_flag:
86            return
87        self.managers = {}
88        self.env_start(environment, user_config_file)
89        EnvironmentManager.__init_flag = True
90
91    def env_start(self, environment="", user_config_file=""):
92
93        log_level_dict = UserConfigManager(
94            config_file=user_config_file, env=environment).get_log_level()
95
96        if log_level_dict:
97            # change log level when load or reset EnvironmentManager object
98            change_logger_level(log_level_dict)
99
100        manager_plugins = get_plugin(Plugin.MANAGER)
101        for manager_plugin in manager_plugins:
102            try:
103                manager_instance = manager_plugin.__class__()
104                manager_instance.init_environment(environment,
105                                                  user_config_file)
106                self.managers[manager_instance.__class__.__name__] = \
107                    manager_instance
108            except Exception as error:
109                LOG.debug("Env start error:%s" % error)
110
111    def env_stop(self):
112        for manager in self.managers.values():
113            manager.env_stop()
114            manager.devices_list = []
115        self.managers = {}
116
117        EnvironmentManager.__init_flag = False
118
119    def apply_environment(self, device_options):
120        environment = Environment()
121        for device_option in device_options:
122            device = self.apply_device(device_option)
123            if device is not None:
124                environment.add_device(device)
125                device.extend_value = device_option.extend_value
126                LOG.debug("device %s: extend value: %s", device.device_sn,
127                          device.extend_value)
128
129        return environment
130
131    def release_environment(self, environment):
132        for device in environment.devices:
133            device.extend_value = {}
134            self.release_device(device)
135
136    def apply_device(self, device_option, timeout=10):
137        for manager_type, manager in self.managers.items():
138            support_labels = getattr(manager, "support_labels", [])
139            if not support_labels:
140                continue
141            if device_option.label is None:
142                if manager_type != "ManagerDevice":
143                    continue
144            else:
145                if support_labels and \
146                        device_option.label not in support_labels:
147                    continue
148            device = manager.apply_device(device_option, timeout)
149            if device:
150                return device
151        else:
152            return None
153
154    def check_device_exist(self, device_options):
155        """
156        Check if there are matched devices which can be allocated or available.
157        """
158        devices = []
159        for device_option in device_options:
160            for manager_type, manager in self.managers.items():
161                support_labels = getattr(manager, "support_labels", [])
162                if device_option.label is None:
163                    if manager_type != "ManagerDevice":
164                        continue
165                else:
166                    if support_labels and \
167                            device_option.label not in support_labels:
168                        continue
169                for device in manager.devices_list:
170                    if device.device_sn in devices:
171                        continue
172                    if device_option.matches(device, False):
173                        devices.append(device.device_sn)
174                        break
175                else:
176                    continue
177                break
178            else:
179                return False
180        return True
181
182    def release_device(self, device):
183        for manager in self.managers.values():
184            if device in manager.devices_list:
185                manager.release_device(device)
186
187    def list_devices(self):
188        LOG.info("list devices.")
189        for manager in self.managers.values():
190            manager.list_devices()
191
192
193class DeviceSelectionOption(object):
194    """
195    Class representing device selection option
196    """
197
198    def __init__(self, options, label=None, test_source=None):
199        self.device_sn = [x for x in options["device_sn"].split(";") if x]
200        self.label = label
201        self.test_driver = test_source.test_type
202        self.source_file = ""
203        self.extend_value = {}
204        self.required_component = ""
205
206    def get_label(self):
207        return self.label
208
209    def matches(self, device, allocate=True):
210        if not getattr(device, "task_state", True):
211            return False
212        if allocate and device.device_allocation_state != \
213                DeviceAllocationState.available:
214            return False
215
216        if not allocate:
217            if device.device_allocation_state != \
218                    DeviceAllocationState.available and \
219                    device.device_allocation_state != \
220                    DeviceAllocationState.allocated:
221                return False
222
223        if len(self.device_sn) != 0 and device.device_sn not in self.device_sn:
224            return False
225
226        if self.label and self.label != device.label:
227            return False
228
229        if self.required_component and \
230                hasattr(device, ConfigConst.support_component):
231            subsystems, parts = getattr(device, ConfigConst.support_component)
232            required_subsystems, require_part = self.required_component
233            if required_subsystems not in subsystems and \
234                    require_part not in parts:
235                return False
236
237        return True
238
239
240@dataclass
241class DeviceAllocationState:
242    ignored = "Ignored"
243    available = "Available"
244    allocated = "Allocated"
245    unusable = "Unusable"
246