• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17import json
18import os
19import time
20import sys
21
22from abc import abstractmethod
23from abc import ABCMeta
24from xml.etree import ElementTree
25from xml.etree.ElementTree import Element
26
27
28from _core.interface import IFilter
29from _core.interface import IDeviceManager
30from _core.logger import platform_logger
31from _core.plugin import Plugin
32from _core.plugin import get_plugin
33from _core.utils import convert_serial
34from _core.utils import get_cst_time
35from _core.logger import change_logger_level
36from _core.constants import ConfigConst
37from _core.constants import FilePermission
38from _core.executor.scheduler import Scheduler
39
40LOG = platform_logger("EnvPool")
41
42__all__ = ["EnvPool", "XMLNode", "Selector", "DeviceSelector", "DeviceNode", "is_env_pool_run_mode"]
43
44
45class EnvPool(object):
46    """
47    Class representing environment pool that
48    managing the set of available devices for testing.
49    this class is used directly by users without going through the command flow.
50    """
51    instance = None
52    __init_flag = False
53    report_path = None
54    resource_path = None
55
56    def __new__(cls, *args, **kwargs):
57        """
58        Singleton instance
59        """
60        del args, kwargs
61        if cls.instance is None:
62            cls.instance = super(EnvPool, cls).__new__(cls)
63        return cls.instance
64
65    def __init__(self, **kwargs):
66        EnvPool.report_path = kwargs.get("report_path", "")
67        self._stop_task_log()
68        self._start_task_log()
69        if EnvPool.__init_flag:
70            return
71        self._managers = {}
72        self._filters = {}
73        self._init_log_level(kwargs.get("log_level", "info"))
74        self._load_managers()
75        EnvPool.__init_flag = True
76        EnvPool.resource_path = kwargs.get("resource_path",
77                                           os.path.join(os.path.abspath(os.getcwd()), "resource"))
78        setattr(sys, "ecotest_resource_path", EnvPool.resource_path)
79        # init cache file and check if expire
80        cache_file = Cache()
81        cache_file.check_cache_if_expire()
82        self.devices = list()
83
84    def _load_managers(self):
85        LOG.info("Load Managers ...")
86        manager_plugins = get_plugin(Plugin.MANAGER)
87        for manager_plugin in manager_plugins:
88            try:
89                manager_instance = manager_plugin.__class__()
90                self._managers[manager_instance.__class__.__name__] = \
91                    manager_instance
92            except Exception as error:
93                LOG.error("Pool start error: {}".format(error))
94        # reverse sort
95        if self._managers:
96            self._managers = dict(sorted(self._managers.items(), reverse=True))
97
98    def _unload_manager(self):
99        for manager in self._managers.values():
100            if manager.__class__.__name__ not in self._filters:
101                continue
102            manager.devices_list = []
103        self._managers = {}
104        EnvPool.__init_flag = False
105
106    def get_device(self, selector, timeout=10):
107        LOG.info("Get device by selector")
108        device = self._apply_device(selector, timeout)
109        if device is not None:
110            LOG.info("Device {}: extend value: {}".format(
111                convert_serial(device.device_sn), device.extend_value))
112            self.devices.append(device)
113        else:
114            LOG.info("Require label is '{}', can't get device".
115                     format(selector.label))
116        return device
117
118    def init_pool(self, node):
119        LOG.info("Prepare to init pool")
120        for manager in self._managers.values():
121            if not isinstance(manager, IFilter):
122                continue
123            if not manager.__filter_xml_node__(node):
124                continue
125            if not isinstance(manager, IDeviceManager):
126                continue
127            manager.init_environment(node.format(), "")
128            self._filters[manager.__class__.__name__] = manager
129            LOG.info("Pool is prepared")
130        if not self._filters:
131            LOG.info("Can't find any manager, may be no connector are assign"
132                     "or the plugins of manager are not installed!")
133
134    def shutdown(self):
135        # clear device rpc port
136        for device in self.devices:
137            if hasattr(device, "remove_ports"):
138                device.remove_ports()
139        self._unload_manager()
140        self._stop_task_log()
141
142    def _apply_device(self, selector, timeout=3):
143        LOG.info("Apply device in pool")
144        for manager_type, manager in self._filters.items():
145            if not manager.__filter_selector__(selector):
146                continue
147            device_option = selector.format()
148            if not device_option:
149                continue
150            support_labels = getattr(manager, "support_labels", [])
151            support_types = getattr(manager, "support_types", [])
152            if device_option.required_manager not in support_types:
153                LOG.info("'{}' not in {}'s support types".format(
154                    device_option.required_manager, manager_type))
155                continue
156            if not support_labels:
157                continue
158            if device_option.label is None:
159                if manager_type != "ManagerDevice":
160                    continue
161            else:
162                if support_labels and \
163                        device_option.label not in support_labels:
164                    continue
165            device = manager.apply_device(device_option, timeout)
166            if hasattr(device, "env_index"):
167                device.env_index = device_option.get_env_index()
168            if device:
169                return device
170        else:
171            return None
172
173    @classmethod
174    def _init_log_level(cls, level):
175        if str(level).lower() not in ["debug", "info"]:
176            LOG.info("Level str must be 'debug' or 'info'")
177            return
178        change_logger_level({"console": level})
179
180    @classmethod
181    def _start_task_log(cls):
182        report_folder_path = EnvPool.report_path
183        if not report_folder_path:
184            report_folder_path = os.path.join(
185                os.path.abspath(os.getcwd()), "reports", get_cst_time().strftime("%Y-%m-%d-%H-%M-%S"))
186        if not os.path.exists(report_folder_path):
187            os.makedirs(report_folder_path)
188        LOG.info("Report path: {}".format(report_folder_path))
189        EnvPool.report_path = report_folder_path
190        Scheduler.start_task_log(report_folder_path)
191
192    @classmethod
193    def _stop_task_log(cls):
194        Scheduler.stop_task_logcat()
195
196
197class XMLNode(metaclass=ABCMeta):
198
199    @abstractmethod
200    def __init__(self):
201        self.__device_ele = Element("device")
202        self.__connectors = []
203
204    @abstractmethod
205    def __on_root_attrib__(self, attrib_dict):
206        pass
207
208    def add_element_string(self, element_str=""):
209        if element_str:
210            device_ele = ElementTree.fromstring(element_str)
211            if device_ele.tag == "device":
212                self.__device_ele = device_ele
213        return self
214
215    @classmethod
216    def create_node(cls, tag):
217        return Element(tag)
218
219    def build_connector(self, connector_name):
220        self.__connectors.append(connector_name)
221        return self
222
223    def get_connectors(self):
224        return self.__connectors
225
226    def format(self):
227        attrib_dict = dict()
228        self.__on_root_attrib__(attrib_dict)
229        self.__device_ele.attrib = attrib_dict
230        env = self.create_node("environment")
231        env.append(self.__device_ele)
232        root = self.create_node("user_config")
233        root.append(env)
234        return ElementTree.tostring(root, encoding="utf-8")
235
236    def get_root_node(self):
237        return self.__device_ele
238
239
240class Selector(metaclass=ABCMeta):
241
242    @abstractmethod
243    def __init__(self, _type, label):
244        self.__device_dict = dict()
245        self.__config = dict()
246        self.label = label
247        self.type = _type
248
249    def add_environment_content(self, content):
250        _content = content
251        if isinstance(_content, str):
252            _content = _content.strip()
253            if _content.startswith("[") and _content.endswith("]"):
254                self.__device_dict.update(json.loads(_content)[0])
255            elif _content.startswith("{") and _content.endswith("}"):
256                self.__device_dict.update(json.loads(content))
257            else:
258                raise RuntimeError("Invalid str input! ['{}']".format(_content))
259        elif isinstance(_content, list):
260            self.__device_dict.update(_content[0])
261        elif isinstance(_content, dict):
262            self.__device_dict.update(_content)
263        return self
264
265    @abstractmethod
266    def __on_config__(self, config, device_dict):
267        pass
268
269    @abstractmethod
270    def __on_selection_option__(self, selection_option):
271        pass
272
273    def add_label(self, label):
274        self.label = label
275        return self
276
277    def add_type(self, _type):
278        self.type = _type
279        return self
280
281    def format(self):
282        if self.type or self.label:
283            self.__device_dict.update({"type": self.type})
284            self.__device_dict.update({"label": self.label})
285        self.__on_config__(self.__config, self.__device_dict)
286        index = 1
287        label = self.__device_dict.get("label", "phone")
288        required_manager = self.__device_dict.get("type", "device")
289        device_option = SelectionOption(self.__config, label)
290        self.__device_dict.pop("type", None)
291        self.__device_dict.pop("label", None)
292        device_option.required_manager = required_manager
293        device_option.extend_value = self.__device_dict
294        if hasattr(device_option, "env_index"):
295            device_option.env_index = index
296        index += 1
297        self.__on_selection_option__(device_option)
298        self.__device_dict.clear()
299        self.__config.clear()
300        return device_option
301
302
303class SelectionOption:
304    def __init__(self, options, label=None):
305        self.device_sn = [x for x in options["device_sn"].split(";") if x]
306        self.label = label
307        self.source_file = ""
308        self.extend_value = {}
309        self.required_manager = ""
310        self.env_index = None
311
312    def get_label(self):
313        return self.label
314
315    def get_env_index(self):
316        return self.env_index
317
318    def matches(self, device):
319        LOG.info("Do matches, device:[state:{}, sn:{}, label:{}], selection "
320                 "option:[device sn:{}, label:{}]".format(
321                   device.device_allocation_state,
322                   convert_serial(device.device_sn),
323                   device.label,
324                   [convert_serial(sn) if sn else "" for sn in self.device_sn],
325                   self.label))
326        if not getattr(device, "task_state", True):
327            return False
328
329        if len(self.device_sn) != 0 and device.device_sn not in self.device_sn:
330            return False
331
332        return True
333
334
335class DeviceNode(XMLNode):
336
337    def __init__(self, usb_type, label=""):
338        super().__init__()
339        self.usb_type = usb_type
340        self.label = label
341        self.get_root_node().append(self.create_node("sn"))
342        self.get_root_node().append(self.create_node("ip"))
343        self.get_root_node().append(self.create_node("port"))
344
345    def __on_root_attrib__(self, attrib_dict):
346        attrib_dict.update({"type": self.usb_type})
347        if self.label:
348            attrib_dict.update({"label": self.label})
349
350    def add_address(self, host, port):
351        host_ele = self.get_root_node().find("ip")
352        port_ele = self.get_root_node().find("port")
353        host_ele.text = host
354        port_ele.text = port
355        return self
356
357    def add_device_sn(self, device_sn):
358        sn_ele = self.get_root_node().find("sn")
359        if sn_ele.text:
360            sn_ele.text = "{};{}".format(sn_ele.text, device_sn)
361        else:
362            sn_ele.text = device_sn
363        return self
364
365
366class DeviceSelector(Selector):
367
368    def __init__(self, _type="", label=""):
369        super().__init__(_type, label)
370        self.device_sn = ""
371
372    def __on_config__(self, config, device_dict):
373        config.update({"device_sn": self.device_sn})
374
375    def __on_selection_option__(self, selection_option):
376        pass
377
378    def add_device_sn(self, device_sn):
379        self.device_sn = device_sn
380        return self
381
382
383class Cache:
384    def __init__(self):
385        from xdevice import Variables
386        self.cache_file = os.path.join(Variables.temp_dir, "cache.dat")
387        self.expire_time = 1  # days
388
389    def check_cache_if_expire(self):
390        if os.path.exists(self.cache_file):
391            current_modify_time = os.path.getmtime(self.cache_file)
392            current_time = time.time()
393            if Cache.get_delta_days(current_modify_time, current_time) < self.expire_time:
394                setattr(sys, ConfigConst.env_pool_cache, True)
395                LOG.info("Env pool running in cache mode.")
396                return
397        self.update_cache()
398        setattr(sys, ConfigConst.env_pool_cache, False)
399        LOG.info("Env pool running in normal mode.")
400
401    @staticmethod
402    def get_delta_days(t1, t2):
403        import datetime
404        dt2 = datetime.datetime.fromtimestamp(t2)
405        dt1 = datetime.datetime.fromtimestamp(t1)
406        return (dt2 - dt1).days
407
408    def update_cache(self):
409        flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
410        with os.fdopen(os.open(self.cache_file, flags, FilePermission.mode_755),
411                       "wb") as f:
412            f.write(b'123')
413
414
415def is_env_pool_run_mode():
416    return False if EnvPool.instance is None else True