• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from dataclasses import dataclass
20
21from _core.config.config_manager import UserConfigManager
22from _core.logger import platform_logger
23from _core.logger import change_logger_level
24from _core.plugin import Plugin
25from _core.plugin import get_plugin
26from _core.utils import convert_serial
27from _core.constants import ProductForm
28from _core.constants import ConfigConst
29from _core.environment.device_state import DeviceAllocationState
30
31__all__ = ["EnvironmentManager", "DeviceSelectionOption", "Environment"]
32
33LOG = platform_logger("ManagerEnv")
34
35
36class Environment(object):
37    """
38    Environment required for each dispatch
39    """
40    device_mapper = {
41        ProductForm.phone: "Phone",
42        ProductForm.tablet: "Tablet",
43        ProductForm.car: "Car",
44        ProductForm.television: "Tv",
45        ProductForm.watch: "Watch",
46        }
47
48    def __init__(self):
49        self.devices = []
50        self.phone = 0
51        self.wifiiot = 0
52        self.ipcamera = 0
53        self.device_recorder = dict()
54
55    def __get_serial__(self):
56        device_serials = []
57        for device in self.devices:
58            device_serials.append(convert_serial(device.__get_serial__()))
59        return ";".join(device_serials)
60
61    def get_devices(self):
62        return self.devices
63
64    def check_serial(self):
65        if self.__get_serial__():
66            return True
67        return False
68
69    def add_device(self, device, index=None):
70        label = self.device_mapper.get(device.label, "DUT")
71        if index:
72            current = index
73        else:
74            current = self.device_recorder.get(label, 0) + 1
75        device.device_id = "%s%s" % (label, current)
76        LOG.debug("add_device, sn: {}, id: {}".format(device.device_sn, device.device_id))
77        self.device_recorder.update({label: current})
78        self.devices.append(device)
79
80
81class EnvironmentManager(object):
82    """
83    Class representing environment manager that
84    managing the set of available devices for testing
85    """
86    __instance = None
87    __init_flag = False
88
89    def __new__(cls, *args, **kwargs):
90        """
91        Singleton instance
92        """
93        del args, kwargs
94        if cls.__instance is None:
95            cls.__instance = super(EnvironmentManager, cls).__new__(cls)
96        return cls.__instance
97
98    def __init__(self, environment="", user_config_file=""):
99        if EnvironmentManager.__init_flag:
100            return
101        self.managers = {}
102        self.env_start(environment, user_config_file)
103        EnvironmentManager.__init_flag = True
104
105    def env_start(self, environment="", user_config_file=""):
106
107        log_level_dict = UserConfigManager(
108            config_file=user_config_file, env=environment).get_log_level()
109        if log_level_dict:
110            # change log level when load or reset EnvironmentManager object
111            change_logger_level(log_level_dict)
112
113        manager_plugins = get_plugin(Plugin.MANAGER)
114        for manager_plugin in manager_plugins:
115            try:
116                manager_instance = manager_plugin.__class__()
117                manager_instance.init_environment(environment,
118                                                  user_config_file)
119                self.managers[manager_instance.__class__.__name__] = \
120                    manager_instance
121            except Exception as error:
122                LOG.debug("Env start error: %s" % error)
123        if len(self.managers):
124            self.managers = dict(sorted(self.managers.items(), reverse=True))
125
126    def env_stop(self):
127        for manager in self.managers.values():
128            manager.env_stop()
129            manager.devices_list = []
130        self.managers = {}
131
132        EnvironmentManager.__init_flag = False
133
134    def apply_environment(self, device_options):
135        environment = Environment()
136        for device_option in device_options:
137            LOG.debug("Visit options to find device")
138            device = self.apply_device(device_option)
139            if device is not None:
140                index = self.get_config_device_index(device)
141                environment.add_device(device, index)
142                device.extend_value = device_option.extend_value
143                LOG.debug("Device %s: extend value: %s", convert_serial(
144                    device.device_sn), device.extend_value)
145            else:
146                LOG.debug("Require label is '%s', then next" %
147                          device_option.label)
148        return environment
149
150    def release_environment(self, environment):
151        for device in environment.devices:
152            device.extend_value = {}
153            self.release_device(device)
154
155    def reset_environment(self, used_devices):
156        for _, device in used_devices.items():
157            self.reset_device(device)
158
159    def apply_device(self, device_option, timeout=10):
160        LOG.debug("Apply device from managers:%s" % self.managers)
161        for manager_type, manager in self.managers.items():
162            support_labels = getattr(manager, "support_labels", [])
163            support_types = getattr(manager, "support_types", [])
164            if device_option.required_manager not in support_types:
165                LOG.warning("'%s' not in %s's support types" % (
166                    device_option.required_manager, manager_type))
167                continue
168            if not support_labels:
169                continue
170            if device_option.label is None:
171                if manager_type != "ManagerDevice" and \
172                        manager_type != "ManagerAospDevice":
173                    continue
174            else:
175                if support_labels and \
176                        device_option.label not in support_labels:
177                    continue
178            device = manager.apply_device(device_option, timeout)
179            if hasattr(device, "env_index"):
180                device.env_index = device_option.get_env_index()
181            if device:
182                return device
183        else:
184            return None
185
186    def get_config_device_index(self, device):
187        if device and hasattr(device, "device_sn"):
188            sn = device.device_sn
189            for manager in self.managers.items():
190                if hasattr(manager[1], "global_device_filter"):
191                    index = 1
192                    for s in manager[1].global_device_filter:
193                        if s == sn:
194                            return index
195                        else:
196                            index += 1
197        return None
198
199    def check_device_exist(self, device_options):
200        """
201        Check if there are matched devices which can be allocated or available.
202        """
203        devices = []
204        for device_option in device_options:
205            for manager_type, manager in self.managers.items():
206                support_labels = getattr(manager, "support_labels", [])
207                support_types = getattr(manager, "support_types", [])
208                if device_option.required_manager not in support_types:
209                    continue
210                if device_option.label is None:
211                    if manager_type != "ManagerDevice" and \
212                        manager_type != "ManagerAospDevice":
213                        continue
214                else:
215                    if support_labels and \
216                            device_option.label not in support_labels:
217                        continue
218                for device in manager.devices_list:
219                    if device.device_sn in devices:
220                        continue
221                    if device_option.matches(device, False):
222                        devices.append(device.device_sn)
223                        break
224                else:
225                    continue
226                break
227            else:
228                return False
229        return True
230
231    def release_device(self, device):
232        for manager in self.managers.values():
233            if device in manager.devices_list:
234                manager.release_device(device)
235
236    def reset_device(self, device):
237        for manager in self.managers.values():
238            if device in manager.devices_list:
239                manager.reset_device(device)
240
241    def list_devices(self):
242        LOG.info("List devices.")
243        for manager in self.managers.values():
244            manager.list_devices()
245
246
247class DeviceSelectionOption(object):
248    """
249    Class representing device selection option
250    """
251
252    def __init__(self, options, label=None, test_source=None):
253        self.device_sn = [x for x in options["device_sn"].split(";") if x]
254        self.label = label
255        self.test_driver = test_source.test_type
256        self.source_file = ""
257        self.extend_value = {}
258        self.required_manager = ""
259        self.required_component = ""
260        self.env_index = None
261
262    def get_label(self):
263        return self.label
264
265    def get_env_index(self):
266        return self.env_index
267
268    def matches(self, device, allocate=True):
269        LOG.debug("Do matches, device:{state:%s, sn:%s, label:%s}, selection "
270                  "option:{device sn:%s, label:%s}" % (
271                   device.device_allocation_state,
272                   convert_serial(device.device_sn),
273                   device.label,
274                   [convert_serial(sn) if sn else "" for sn in self.device_sn],
275                   self.label))
276        if not getattr(device, "task_state", True):
277            return False
278        if allocate and device.device_allocation_state != \
279                DeviceAllocationState.available:
280            return False
281
282        if not allocate:
283            if device.device_allocation_state != \
284                    DeviceAllocationState.available and \
285                    device.device_allocation_state != \
286                    DeviceAllocationState.allocated:
287                return False
288
289        if len(self.device_sn) != 0 and device.device_sn not in self.device_sn:
290            return False
291
292        if self.label and self.label != device.label:
293            return False
294        if self.required_component and \
295                hasattr(device, ConfigConst.support_component):
296            subsystems, parts = getattr(device, ConfigConst.support_component)
297            required_subsystems, require_part = self.required_component
298            if required_subsystems not in subsystems and \
299                    require_part not in parts:
300                return False
301        return True
302