• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from dataclasses import dataclass
20
21from _core.config.config_manager import UserConfigManager
22from _core.logger import platform_logger
23from _core.logger import change_logger_level
24from _core.plugin import Plugin
25from _core.plugin import get_plugin
26from _core.utils import convert_serial
27from _core.constants import ProductForm
28from _core.constants import ConfigConst
29from _core.environment.device_state import DeviceAllocationState
30
31__all__ = ["EnvironmentManager", "DeviceSelectionOption", "Environment"]
32
33LOG = platform_logger("ManagerEnv")
34
35
36class Environment(object):
37    """
38    Environment required for each dispatch
39    """
40    device_mapper = {
41        ProductForm.phone: "Phone",
42        ProductForm.tablet: "Tablet",
43        ProductForm.car: "Car",
44        ProductForm.television: "Tv",
45        ProductForm.watch: "Watch",
46        }
47
48    def __init__(self):
49        self.devices = []
50        self.phone = 0
51        self.wifiiot = 0
52        self.ipcamera = 0
53        self.device_recorder = dict()
54
55    def __get_serial__(self):
56        device_serials = []
57        for device in self.devices:
58            device_serials.append(convert_serial(device.__get_serial__()))
59        return ";".join(device_serials)
60
61    def get_devices(self):
62        return self.devices
63
64    def get_description(self):
65        descriptions = []
66        for d in self.devices:
67            try:
68                descriptions.append(d.device_description)
69            except Exception as e:
70                LOG.error(f"get device description error: {e}")
71        return descriptions
72
73    def check_serial(self):
74        if self.__get_serial__():
75            return True
76        return False
77
78    def add_device(self, device, index=None):
79        label = self.device_mapper.get(device.label, "DUT")
80        if index:
81            current = index
82        else:
83            current = self.device_recorder.get(label, 0) + 1
84        device.device_id = "%s%s" % (label, current)
85        LOG.debug("add_device, sn: {}, id: {}".format(device.device_sn, device.device_id))
86        self.device_recorder.update({label: current})
87        self.devices.append(device)
88
89
90class EnvironmentManager(object):
91    """
92    Class representing environment manager that
93    managing the set of available devices for testing
94    """
95    __instance = None
96    __init_flag = False
97
98    def __new__(cls, *args, **kwargs):
99        """
100        Singleton instance
101        """
102        del args, kwargs
103        if cls.__instance is None:
104            cls.__instance = super(EnvironmentManager, cls).__new__(cls)
105        return cls.__instance
106
107    def __init__(self, environment="", user_config_file=""):
108        if EnvironmentManager.__init_flag:
109            return
110        self.managers = {}
111        self.env_start(environment, user_config_file)
112        EnvironmentManager.__init_flag = True
113
114    def env_start(self, environment="", user_config_file=""):
115
116        log_level_dict = UserConfigManager(
117            config_file=user_config_file, env=environment).get_log_level()
118        if log_level_dict:
119            # change log level when load or reset EnvironmentManager object
120            change_logger_level(log_level_dict)
121
122        manager_plugins = get_plugin(Plugin.MANAGER)
123        for manager_plugin in manager_plugins:
124            try:
125                manager_instance = manager_plugin.__class__()
126                manager_instance.init_environment(environment,
127                                                  user_config_file)
128                self.managers[manager_instance.__class__.__name__] = \
129                    manager_instance
130            except Exception as error:
131                LOG.debug("Env start error: %s" % error)
132        if len(self.managers):
133            self.managers = dict(sorted(self.managers.items(), reverse=True))
134
135    def env_stop(self):
136        for manager in self.managers.values():
137            manager.env_stop()
138            manager.devices_list = []
139        self.managers = {}
140
141        EnvironmentManager.__init_flag = False
142
143    def apply_environment(self, device_options):
144        environment = Environment()
145        for device_option in device_options:
146            LOG.debug("Visit options to find device")
147            device = self.apply_device(device_option)
148            if device is not None:
149                index = self.get_config_device_index(device)
150                environment.add_device(device, index)
151                device.extend_value = device_option.extend_value
152                LOG.debug("Device %s: extend value: %s", convert_serial(
153                    device.device_sn), device.extend_value)
154                if hasattr(device, "extend_device_props"):
155                    device.extend_device_props()
156                device.init_description()
157            else:
158                LOG.debug("Require label is '%s', then next" %
159                          device_option.label)
160        return environment
161
162    def release_environment(self, environment):
163        for device in environment.devices:
164            device.extend_value = {}
165            self.release_device(device)
166
167    def reset_environment(self, used_devices):
168        for _, device in used_devices.items():
169            self.reset_device(device)
170
171    def apply_device(self, device_option, timeout=3):
172        LOG.debug("Apply device from managers:%s" % self.managers)
173        for manager_type, manager in self.managers.items():
174            support_labels = getattr(manager, "support_labels", [])
175            support_types = getattr(manager, "support_types", [])
176            if device_option.required_manager not in support_types:
177                LOG.warning("'%s' not in %s's support types" % (
178                    device_option.required_manager, manager_type))
179                continue
180            if not support_labels:
181                continue
182            if device_option.label is None:
183                if manager_type != "ManagerDevice":
184                    continue
185            else:
186                if support_labels and \
187                        device_option.label not in support_labels:
188                    continue
189            device = manager.apply_device(device_option, timeout)
190            if hasattr(device, "env_index"):
191                device.env_index = device_option.get_env_index()
192            if device:
193                return device
194        else:
195            return None
196
197    def get_config_device_index(self, device):
198        if device and hasattr(device, "device_sn"):
199            sn = device.device_sn
200            for manager in self.managers.items():
201                if hasattr(manager[1], "global_device_filter"):
202                    index = 1
203                    for s in manager[1].global_device_filter:
204                        if s == sn:
205                            return index
206                        else:
207                            index += 1
208        return None
209
210    def check_device_exist(self, device_options):
211        """
212        Check if there are matched devices which can be allocated or available.
213        """
214        devices = []
215        for device_option in device_options:
216            for manager_type, manager in self.managers.items():
217                support_labels = getattr(manager, "support_labels", [])
218                support_types = getattr(manager, "support_types", [])
219                if device_option.required_manager not in support_types:
220                    continue
221                if device_option.label is None:
222                    if manager_type != "ManagerDevice":
223                        continue
224                else:
225                    if support_labels and \
226                            device_option.label not in support_labels:
227                        continue
228                for device in manager.devices_list:
229                    if device.device_sn in devices:
230                        continue
231                    if device_option.matches(device, False):
232                        devices.append(device.device_sn)
233                        break
234                else:
235                    continue
236                break
237            else:
238                return False
239        return True
240
241    def release_device(self, device):
242        for manager in self.managers.values():
243            if device in manager.devices_list:
244                manager.release_device(device)
245
246    def reset_device(self, device):
247        for manager in self.managers.values():
248            if device in manager.devices_list:
249                manager.reset_device(device)
250
251    def list_devices(self):
252        LOG.info("List devices.")
253        for manager in self.managers.values():
254            manager.list_devices()
255
256
257class DeviceSelectionOption(object):
258    """
259    Class representing device selection option
260    """
261
262    def __init__(self, options, label=None, test_source=None):
263        self.device_sn = [x for x in options["device_sn"].split(";") if x]
264        self.label = label
265        self.test_driver = test_source.test_type
266        self.source_file = ""
267        self.extend_value = {}
268        self.required_manager = ""
269        self.required_component = ""
270        self.env_index = None
271
272    def get_label(self):
273        return self.label
274
275    def get_env_index(self):
276        return self.env_index
277
278    def matches(self, device, allocate=True):
279        LOG.debug("Do matches, device:{state:%s, sn:%s, label:%s}, selection "
280                  "option:{device sn:%s, label:%s}" % (
281                   device.device_allocation_state,
282                   convert_serial(device.device_sn),
283                   device.label,
284                   [convert_serial(sn) if sn else "" for sn in self.device_sn],
285                   self.label))
286        if not getattr(device, "task_state", True):
287            return False
288        if allocate and device.device_allocation_state != \
289                DeviceAllocationState.available:
290            return False
291
292        if not allocate:
293            if device.device_allocation_state != \
294                    DeviceAllocationState.available and \
295                    device.device_allocation_state != \
296                    DeviceAllocationState.allocated:
297                return False
298
299        if len(self.device_sn) != 0 and device.device_sn not in self.device_sn:
300            return False
301
302        if self.label and self.label != device.label:
303            return False
304        if self.required_component and \
305                hasattr(device, ConfigConst.support_component):
306            subsystems, parts = getattr(device, ConfigConst.support_component)
307            required_subsystems, require_part = self.required_component
308            if required_subsystems not in subsystems and \
309                    require_part not in parts:
310                return False
311        return True
312