• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17import json
18import os
19import time
20import sys
21
22from abc import abstractmethod
23from abc import ABCMeta
24from xml.etree import ElementTree
25from xml.etree.ElementTree import Element
26
27
28from _core.interface import IFilter
29from _core.interface import IDeviceManager
30from _core.logger import platform_logger
31from _core.plugin import Plugin
32from _core.plugin import get_plugin
33from _core.utils import convert_serial
34from _core.utils import get_cst_time
35from _core.logger import change_logger_level
36from _core.constants import ConfigConst
37from _core.constants import FilePermission
38from _core.context.log import RuntimeLogs
39
40LOG = platform_logger("EnvPool")
41
42__all__ = ["EnvPool", "XMLNode", "Selector", "DeviceSelector", "DeviceNode", "is_env_pool_run_mode"]
43
44
45class EnvPool(object):
46    """
47    Class representing environment pool that
48    managing the set of available devices for testing.
49    this class is used directly by users without going through the command flow.
50    """
51    instance = None
52    __init_flag = False
53    report_path = None
54    resource_path = None
55
56    def __new__(cls, *args, **kwargs):
57        """
58        Singleton instance
59        """
60        del args, kwargs
61        if cls.instance is None:
62            cls.instance = super(EnvPool, cls).__new__(cls)
63        return cls.instance
64
65    def __init__(self, **kwargs):
66        EnvPool.report_path = kwargs.get("report_path", "")
67        self._stop_task_log()
68        self._start_task_log()
69        if EnvPool.__init_flag:
70            return
71        self._managers = {}
72        self._filters = {}
73        self._init_log_level(kwargs.get("log_level", "info"))
74        self._load_managers()
75        EnvPool.__init_flag = True
76        EnvPool.resource_path = kwargs.get("resource_path",
77                                           os.path.join(os.path.abspath(os.getcwd()), "resource"))
78        setattr(sys, "ecotest_resource_path", EnvPool.resource_path)
79        # init cache file and check if expire
80        cache_file = Cache()
81        cache_file.check_cache_if_expire()
82        self.devices = list()
83
84    def _load_managers(self):
85        LOG.info("Load Managers ...")
86        from xdevice import EnvironmentManager
87        if EnvironmentManager._EnvironmentManager__init_flag:
88            LOG.info("Managers has inited, using EnvironmentManager managers...")
89            self._managers = EnvironmentManager._EnvironmentManager__instance.managers
90            return
91        manager_plugins = get_plugin(Plugin.MANAGER)
92        for manager_plugin in manager_plugins:
93            try:
94                manager_instance = manager_plugin.__class__()
95                self._managers[manager_instance.__class__.__name__] = manager_instance
96            except Exception as error:
97                LOG.error("Pool start error: {}".format(error))
98        # 倒序排列, 优先加载OH设备
99        if self._managers:
100            self._managers = dict(sorted(self._managers.items(), reverse=True))
101
102    def _unload_manager(self):
103        for manager in self._managers.values():
104            if manager.__class__.__name__ not in self._filters:
105                continue
106            manager.devices_list.clear()
107            manager.env_stop()
108        self._managers.clear()
109        EnvPool.__init_flag = False
110
111    def get_device(self, selector, timeout=10):
112        LOG.info("Get device by selector")
113        device = self._apply_device(selector, timeout)
114        if device is not None:
115            LOG.info("Device {}: extend value: {}".format(
116                convert_serial(device.device_sn), device.extend_value))
117            self.devices.append(device)
118        else:
119            LOG.info("Require label is '{}', can't get device".
120                     format(selector.label))
121        return device
122
123    def init_pool(self, node):
124        LOG.info("Prepare to init pool")
125        for manager in self._managers.values():
126            if not isinstance(manager, IFilter):
127                continue
128            if not manager.__filter_xml_node__(node):
129                continue
130            if not isinstance(manager, IDeviceManager):
131                continue
132            manager.init_environment(node.format(), "")
133            self._filters[manager.__class__.__name__] = manager
134            LOG.info("Pool is prepared")
135        if not self._filters:
136            LOG.info("Can't find any manager, may be no connector are assign"
137                     "or the plugins of manager are not installed!")
138
139    def shutdown(self):
140        # clear device rpc port
141        for device in self.devices:
142            if hasattr(device, "remove_ports"):
143                device.remove_ports()
144        self._unload_manager()
145        self._stop_task_log()
146        EnvPool.instance = None
147        EnvPool.__init_flag = False
148
149    def _apply_device(self, selector, timeout=3):
150        LOG.info("Apply device in pool")
151        for manager_type, manager in self._filters.items():
152            if not manager.__filter_selector__(selector):
153                continue
154            device_option = selector.format()
155            if not device_option:
156                continue
157            support_labels = getattr(manager, "support_labels", [])
158            support_types = getattr(manager, "support_types", [])
159            if device_option.required_manager not in support_types:
160                LOG.info("'{}' not in {}'s support types".format(
161                    device_option.required_manager, manager_type))
162                continue
163            if not support_labels:
164                continue
165            if device_option.label is None:
166                if manager_type != "ManagerDevice":
167                    continue
168            else:
169                if support_labels and \
170                        device_option.label not in support_labels:
171                    continue
172            device = manager.apply_device(device_option, timeout)
173            if hasattr(device, "env_index"):
174                device.env_index = device_option.get_env_index()
175            if device:
176                return device
177        else:
178            return None
179
180    @classmethod
181    def _init_log_level(cls, level):
182        if str(level).lower() not in ["debug", "info"]:
183            LOG.info("Level str must be 'debug' or 'info'")
184            return
185        change_logger_level({"console": level})
186
187    @classmethod
188    def _start_task_log(cls):
189        report_folder_path = EnvPool.report_path
190        if not report_folder_path:
191            report_folder_path = os.path.join(
192                os.path.abspath(os.getcwd()), "reports", get_cst_time().strftime("%Y-%m-%d-%H-%M-%S"))
193        if not os.path.exists(report_folder_path):
194            os.makedirs(report_folder_path)
195        LOG.info("Report path: {}".format(report_folder_path))
196        EnvPool.report_path = report_folder_path
197        RuntimeLogs.start_task_log(report_folder_path)
198
199    @classmethod
200    def _stop_task_log(cls):
201        RuntimeLogs.stop_task_logcat()
202
203
204class XMLNode(metaclass=ABCMeta):
205
206    @abstractmethod
207    def __init__(self):
208        self.__device_ele = Element("device")
209        self.__connectors = []
210
211    @abstractmethod
212    def __on_root_attrib__(self, attrib_dict):
213        pass
214
215    def add_element_string(self, element_str=""):
216        if element_str:
217            device_ele = ElementTree.fromstring(element_str)
218            if device_ele.tag == "device":
219                self.__device_ele = device_ele
220        return self
221
222    @classmethod
223    def create_node(cls, tag):
224        return Element(tag)
225
226    def build_connector(self, connector_name):
227        self.__connectors.append(connector_name)
228        return self
229
230    def get_connectors(self):
231        return self.__connectors
232
233    def format(self):
234        attrib_dict = dict()
235        self.__on_root_attrib__(attrib_dict)
236        self.__device_ele.attrib = attrib_dict
237        env = self.create_node("environment")
238        env.append(self.__device_ele)
239        root = self.create_node("user_config")
240        root.append(env)
241        return ElementTree.tostring(root, encoding="utf-8")
242
243    def get_root_node(self):
244        return self.__device_ele
245
246
247class Selector(metaclass=ABCMeta):
248
249    @abstractmethod
250    def __init__(self, _type, label):
251        self.__device_dict = dict()
252        self.__config = dict()
253        self.label = label
254        self.type = _type
255
256    def add_environment_content(self, content):
257        _content = content
258        if isinstance(_content, str):
259            _content = _content.strip()
260            if _content.startswith("[") and _content.endswith("]"):
261                self.__device_dict.update(json.loads(_content)[0])
262            elif _content.startswith("{") and _content.endswith("}"):
263                self.__device_dict.update(json.loads(content))
264            else:
265                raise RuntimeError("Invalid str input! ['{}']".format(_content))
266        elif isinstance(_content, list):
267            self.__device_dict.update(_content[0])
268        elif isinstance(_content, dict):
269            self.__device_dict.update(_content)
270        return self
271
272    @abstractmethod
273    def __on_config__(self, config, device_dict):
274        pass
275
276    @abstractmethod
277    def __on_selection_option__(self, selection_option):
278        pass
279
280    def add_label(self, label):
281        self.label = label
282        return self
283
284    def add_type(self, _type):
285        self.type = _type
286        return self
287
288    def format(self):
289        if self.type or self.label:
290            self.__device_dict.update({"type": self.type})
291            self.__device_dict.update({"label": self.label})
292        self.__on_config__(self.__config, self.__device_dict)
293        index = 1
294        label = self.__device_dict.get("label", "phone")
295        required_manager = self.__device_dict.get("type", "device")
296        device_option = SelectionOption(self.__config, label)
297        self.__device_dict.pop("type", None)
298        self.__device_dict.pop("label", None)
299        device_option.required_manager = required_manager
300        device_option.extend_value = self.__device_dict
301        if hasattr(device_option, "env_index"):
302            device_option.env_index = index
303        index += 1
304        self.__on_selection_option__(device_option)
305        self.__device_dict.clear()
306        self.__config.clear()
307        return device_option
308
309
310class SelectionOption:
311    def __init__(self, options, label=None):
312        self.device_sn = [x for x in options["device_sn"].split(";") if x]
313        self.label = label
314        self.source_file = ""
315        self.extend_value = {}
316        self.required_manager = ""
317        self.env_index = None
318
319    def get_label(self):
320        return self.label
321
322    def get_env_index(self):
323        return self.env_index
324
325    def matches(self, device):
326        LOG.info("Do matches, device:[state:{}, sn:{}, label:{}], selection "
327                 "option:[device sn:{}, label:{}]".format(
328                   device.device_allocation_state,
329                   convert_serial(device.device_sn),
330                   device.label,
331                   [convert_serial(sn) if sn else "" for sn in self.device_sn],
332                   self.label))
333        if not getattr(device, "task_state", True):
334            return False
335
336        if len(self.device_sn) != 0 and device.device_sn not in self.device_sn:
337            return False
338
339        return True
340
341
342class DeviceNode(XMLNode):
343
344    def __init__(self, usb_type, label=""):
345        super().__init__()
346        self.usb_type = usb_type
347        self.label = label
348        self.get_root_node().append(self.create_node("sn"))
349        self.get_root_node().append(self.create_node("ip"))
350        self.get_root_node().append(self.create_node("port"))
351
352    def __on_root_attrib__(self, attrib_dict):
353        attrib_dict.update({"type": self.usb_type})
354        if self.label:
355            attrib_dict.update({"label": self.label})
356
357    def add_address(self, host, port):
358        host_ele = self.get_root_node().find("ip")
359        port_ele = self.get_root_node().find("port")
360        host_ele.text = host
361        port_ele.text = port
362        return self
363
364    def add_device_sn(self, device_sn):
365        sn_ele = self.get_root_node().find("sn")
366        if sn_ele.text:
367            sn_ele.text = "{};{}".format(sn_ele.text, device_sn)
368        else:
369            sn_ele.text = device_sn
370        return self
371
372
373class DeviceSelector(Selector):
374
375    def __init__(self, _type="", label=""):
376        super().__init__(_type, label)
377        self.device_sn = ""
378
379    def __on_config__(self, config, device_dict):
380        config.update({"device_sn": self.device_sn})
381
382    def __on_selection_option__(self, selection_option):
383        pass
384
385    def add_device_sn(self, device_sn):
386        self.device_sn = device_sn
387        return self
388
389
390class Cache:
391    def __init__(self):
392        from xdevice import Variables
393        self.cache_file = os.path.join(Variables.temp_dir, "cache.dat")
394        self.expire_time = 1  # days
395
396    def check_cache_if_expire(self):
397        if os.path.exists(self.cache_file):
398            current_modify_time = os.path.getmtime(self.cache_file)
399            current_time = time.time()
400            if Cache.get_delta_days(current_modify_time, current_time) < self.expire_time:
401                setattr(sys, ConfigConst.env_pool_cache, True)
402                LOG.info("Env pool running in cache mode.")
403                return
404        self.update_cache()
405        setattr(sys, ConfigConst.env_pool_cache, False)
406        LOG.info("Env pool running in normal mode.")
407
408    @staticmethod
409    def get_delta_days(t1, t2):
410        import datetime
411        dt2 = datetime.datetime.fromtimestamp(t2)
412        dt1 = datetime.datetime.fromtimestamp(t1)
413        return (dt2 - dt1).days
414
415    def update_cache(self):
416        flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
417        with os.fdopen(os.open(self.cache_file, flags, FilePermission.mode_755),
418                       "wb") as f:
419            f.write(b'123')  # 写入字节数据
420
421
422def is_env_pool_run_mode():
423    return False if EnvPool.instance is None else True
424