• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import json
19import os
20import time
21import datetime
22import sys
23
24from abc import abstractmethod
25from abc import ABCMeta
26from xml.etree import ElementTree
27from xml.etree.ElementTree import Element
28
29from _core.interface import IFilter
30from _core.interface import IDeviceManager
31from _core.logger import platform_logger
32from _core.plugin import Plugin
33from _core.plugin import get_plugin
34from _core.utils import convert_serial
35from _core.logger import change_logger_level
36from _core.constants import ConfigConst
37from _core.constants import FilePermission
38
39LOG = platform_logger("EnvPool")
40
41__all__ = ["EnvPool", "XMLNode", "Selector", "DeviceSelector", "DeviceNode"]
42
43
44class EnvPool(object):
45    """
46    Class representing environment pool that
47    managing the set of available devices for testing.
48    this class is used directly by users without going through the command flow.
49    """
50    __instance = None
51    __init_flag = False
52
53    def __new__(cls, *args, **kwargs):
54        """
55        Singleton instance
56        """
57        del args, kwargs
58        if cls.__instance is None:
59            cls.__instance = super(EnvPool, cls).__new__(cls)
60        return cls.__instance
61
62    def __init__(self, log_level="info"):
63        if EnvPool.__init_flag:
64            return
65        self._managers = {}
66        self._filters = {}
67        self._init_log_level(log_level)
68        self._load_managers()
69        EnvPool.__init_flag = True
70        # init cache file and check if expire
71        cache_file = Cache()
72        cache_file.check_cache_if_expire()
73        self.devices = list()
74
75    def _load_managers(self):
76        LOG.info("Load Managers ...")
77        manager_plugins = get_plugin(Plugin.MANAGER)
78        for manager_plugin in manager_plugins:
79            try:
80                manager_instance = manager_plugin.__class__()
81                self._managers[manager_instance.__class__.__name__] = \
82                    manager_instance
83            except Exception as error:
84                LOG.error("Pool start error: {}".format(error))
85        # reverse sort
86        if self._managers:
87            self._managers = dict(sorted(self._managers.items(), reverse=True))
88
89    def _unload_manager(self):
90        for manager in self._managers.values():
91            if manager.__class__.__name__ not in self._filters:
92                continue
93            manager.devices_list = []
94        self._managers = {}
95        EnvPool.__init_flag = False
96
97    def get_device(self, selector, timeout=10):
98        LOG.info("Get device by selector")
99        device = self._apply_device(selector, timeout)
100        if device is not None:
101            LOG.info("Device {}: extend value: {}".format(
102                convert_serial(device.device_sn), device.extend_value))
103            self.devices.append(device)
104        else:
105            LOG.info("Require label is '{}', can't get device".format(selector.label))
106        return device
107
108    def init_pool(self, node):
109        LOG.info("Prepare to init pool")
110        for manager in self._managers.values():
111            if not isinstance(manager, IFilter):
112                continue
113            if not manager.__filter_xml_node__(node):
114                continue
115            if not isinstance(manager, IDeviceManager):
116                continue
117            manager.init_environment(node.format(), "")
118            self._filters[manager.__class__.__name__] = manager
119            LOG.info("Pool is prepared")
120        if not self._filters:
121            LOG.info("Can't find any manager, may be no connector are assign"
122                     "or the plugins of manager are not installed!")
123
124    def shutdown(self):
125        # clear device rpc port
126        for device in self.devices:
127            if hasattr(device, "remove_ports"):
128                device.remove_ports()
129        self._unload_manager()
130
131    def _apply_device(self, selector, timeout=10):
132        LOG.info("Apply device in pool")
133        for manager_type, manager in self._filters.items():
134            if not manager.__filter_selector__(selector):
135                continue
136            device_option = selector.format()
137            if not device_option:
138                continue
139            support_labels = getattr(manager, "support_labels", [])
140            support_types = getattr(manager, "support_types", [])
141            if device_option.required_manager not in support_types:
142                LOG.info("'{}' not in {}'s support types".format(
143                    device_option.required_manager, manager_type))
144                continue
145            if not support_labels:
146                continue
147            if device_option.label is None:
148                if manager_type != "ManagerDevice":
149                    continue
150            else:
151                if support_labels and \
152                        device_option.label not in support_labels:
153                    continue
154            device = manager.apply_device(device_option, timeout)
155            if hasattr(device, "env_index"):
156                device.env_index = device_option.get_env_index()
157            if device:
158                return device
159        else:
160            return None
161
162    @classmethod
163    def _init_log_level(cls, level):
164        if str(level).lower() not in ["debug", "info"]:
165            LOG.info("Level str must be 'debug or 'info")
166            return
167        change_logger_level({"console": level})
168
169
170class XMLNode(metaclass=ABCMeta):
171
172    @abstractmethod
173    def __init__(self):
174        self.__device_ele = Element("device")
175        self.__connectors = []
176
177    @abstractmethod
178    def __on_root_attrib__(self, attrib_dict):
179        pass
180
181    def add_element_string(self, element_str=""):
182        if element_str:
183            device_ele = ElementTree.fromstring(element_str)
184            if device_ele.tag == "device":
185                self.__device_ele = device_ele
186        return self
187
188    @classmethod
189    def create_node(cls, tag):
190        return Element(tag)
191
192    def build_connector(self, connector_name):
193        self.__connectors.append(connector_name)
194        return self
195
196    def get_connectors(self):
197        return self.__connectors
198
199    def format(self):
200        attrib_dict = dict()
201        self.__on_root_attrib__(attrib_dict)
202        self.__device_ele.attrib = attrib_dict
203        env = self.create_node("environment")
204        env.append(self.__device_ele)
205        root = self.create_node("user_config")
206        root.append(env)
207        return ElementTree.tostring(root, encoding="utf-8")
208
209    def get_root_node(self):
210        return self.__device_ele
211
212
213class Selector(metaclass=ABCMeta):
214
215    @abstractmethod
216    def __init__(self, _type, label):
217        self.__device_dict = dict()
218        self.__config = dict()
219        self.label = label
220        self.type = _type
221
222    def add_environment_content(self, content):
223        _content = content
224        if isinstance(_content, str):
225            _content = _content.strip()
226            if _content.startswith("[") and _content.endswith("]"):
227                self.__device_dict.update(json.loads(_content)[0])
228            elif _content.startswith("{") and _content.endswith("}"):
229                self.__device_dict.update(json.loads(content))
230            else:
231                raise RuntimeError("Invalid str input! ['{}']".format(_content))
232        elif isinstance(_content, list):
233            self.__device_dict.update(_content[0])
234        elif isinstance(_content, dict):
235            self.__device_dict.update(_content)
236        return self
237
238    @abstractmethod
239    def __on_config__(self, config, device_dict):
240        pass
241
242    @abstractmethod
243    def __on_selection_option__(self, selection_option):
244        pass
245
246    def add_label(self, label):
247        self.label = label
248        return self
249
250    def add_type(self, _type):
251        self.type = _type
252        return self
253
254    def format(self):
255        if self.type or self.label:
256            self.__device_dict.update({"type": self.type})
257            self.__device_dict.update(({"label": self.label}))
258        self.__on_config__(self.__config, self.__device_dict)
259        index = 1
260        label = self.__device_dict.get("label", "phone")
261        required_manager = self.__device_dict.get("type", "device")
262        device_option = SelectionOption(self.__config, label)
263        self.__device_dict.pop("type", None)
264        self.__device_dict.pop("label", None)
265        device_option.required_manager = required_manager
266        device_option.extend_value = self.__device_dict
267        if hasattr(device_option, "env_index"):
268            device_option.env_index = index
269        index += 1
270        self.__on_selection_option__(device_option)
271        self.__device_dict.clear()
272        self.__config.clear()
273        return device_option
274
275
276class SelectionOption:
277    def __init__(self, options, label=None):
278        self.device_sn = [x for x in options["device_sn"].split(";") if x]
279        self.label = label
280        self.source_file = ""
281        self.extend_value = {}
282        self.required_manager = ""
283        self.env_index = None
284
285    def get_label(self):
286        return self.label
287
288    def get_env_index(self):
289        return self.env_index
290
291    def matches(self, device):
292        LOG.debug("Do matches, device:{state:%s, sn:%s, label:%s}, selection "
293                  "option:{device sn:%s, label:%s}" % (
294                      device.device_allocation_state,
295                      convert_serial(device.device_sn),
296                      device.label,
297                      [convert_serial(sn) if sn else "" for sn in self.device_sn],
298                      self.label))
299        if not getattr(device, "task_state", True):
300            return False
301
302        if len(self.device_sn) != 0 and device.device_sn not in self.device_sn:
303            return False
304
305        return True
306
307
308class DeviceNode(XMLNode):
309
310    def __init__(self, usb_type, label=""):
311        super().__init__()
312        self.usb_type = usb_type
313        self.label = label
314        self.get_root_node().append(self.create_node("sn"))
315        self.get_root_node().append(self.create_node("ip"))
316        self.get_root_node().append(self.create_node("port"))
317
318    def __on_root_attrib__(self, attrib_dict):
319        attrib_dict.update({"type": self.usb_type})
320        if self.label:
321            attrib_dict.update({"label": self.label})
322
323    def add_address(self, host, port):
324        host_ele = self.get_root_node().find("ip")
325        port_ele = self.get_root_node().find("port")
326        host_ele.text = host
327        port_ele.text = port
328
329    def add_device_sn(self, device_sn):
330        sn_ele = self.get_root_node().find("sn")
331        if sn_ele.text:
332            sn_ele.text = "{};{}".format(sn_ele.text, device_sn)
333        else:
334            sn_ele.text = device_sn
335        return self
336
337
338class DeviceSelector(Selector):
339
340    def __init__(self, _type="", label=""):
341        super().__init__(_type, label)
342        self.device_sn = ""
343
344    def __on_config__(self, config, device_dict):
345        config.update({"device_sn": self.device_sn})
346
347    def __on_selection_option__(self, selection_option):
348        pass
349
350    def add_device_sn(self, device_sn):
351        self.device_sn = device_sn
352        return self
353
354
355class Cache:
356    def __init__(self):
357        from xdevice import Variables
358        self.cache_file = os.path.join(Variables.res_dir, "cache.dat")
359        self.expire_time = 1  # days
360
361    def check_cache_if_expire(self):
362        if os.path.exists(self.cache_file):
363            current_modify_time = os.path.getmtime(self.cache_file)
364            current_time = time.time()
365            if Cache.get_delta_days(current_modify_time, current_time) < self.expire_time:
366                setattr(sys, ConfigConst.env_pool_cache, True)
367                LOG.info("Env pool ruuning in cache mode.")
368                return
369        self.update_cache()
370        setattr(sys, ConfigConst.env_pool_cache, False)
371        LOG.info("Env pool running in normal mode.")
372
373    @staticmethod
374    def get_delta_days(t1, t2):
375        dt2 = datetime.datetime.fromtimestamp(t2)
376        dt1 = datetime.datetime.fromtimestamp(t1)
377        return (dt2 - dt1).days
378
379    def update_cache(self):
380        flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND | os.O_BINARY
381        with os.fdopen(os.open(self.cache_file, flags, FilePermission.mode_755),
382                       "wb") as f:
383            f.write(b'123')
384