• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import time
20import threading
21
22from xdevice import UserConfigManager
23from xdevice import DeviceOsType
24from xdevice import ManagerType
25from xdevice import DeviceAllocationState
26from xdevice import Plugin
27from xdevice import get_plugin
28from xdevice import IDeviceManager
29from xdevice import platform_logger
30from xdevice import convert_ip
31from xdevice import convert_port
32from xdevice import convert_serial
33
34from ohos.exception import LiteDeviceError
35
36__all__ = ["ManagerLite"]
37
38LOG = platform_logger("ManagerLite")
39
40
41@Plugin(type=Plugin.MANAGER, id=ManagerType.lite_device)
42class ManagerLite(IDeviceManager):
43    """
44    Class representing device manager that
45    managing the set of available devices for testing
46    """
47
48    def __init__(self):
49        self.devices_list = []
50        self.list_con = threading.Condition()
51        self.support_labels = ["ipcamera", "wifiiot", "watchGT"]
52        self.support_types = ["device"]
53
54    def init_environment(self, environment="", user_config_file=""):
55        device_lite = get_plugin(plugin_type=Plugin.DEVICE,
56                                 plugin_id=DeviceOsType.lite)[0]
57
58        devices = UserConfigManager(
59            config_file=user_config_file, env=environment).get_com_device(
60            "environment/device")
61
62        for device in devices:
63            try:
64                device_lite_instance = device_lite.__class__()
65                device_lite_instance.__init_device__(device)
66                device_lite_instance.device_allocation_state = \
67                    DeviceAllocationState.available
68            except LiteDeviceError as exception:
69                LOG.warning(exception)
70                continue
71
72            self.devices_list.append(device_lite_instance)
73
74    def env_stop(self):
75        pass
76
77    def apply_device(self, device_option, timeout=10):
78        """
79        Request a device for testing that meets certain criteria.
80        """
81        del timeout
82        LOG.debug("Lite apply device: apply lock")
83        self.list_con.acquire()
84        try:
85            allocated_device = None
86            for device in self.devices_list:
87                if device_option.matches(device):
88                    device.device_allocation_state = \
89                        DeviceAllocationState.allocated
90                    LOG.debug("Allocate device sn: %s, type: %s" % (
91                        convert_serial(device.__get_serial__()),
92                        device.__class__))
93                    return device
94            time.sleep(10)
95            return allocated_device
96        finally:
97            LOG.debug("Lite apply device: release lock")
98            self.list_con.release()
99
100    def release_device(self, device):
101        LOG.debug("Lite release device: apply lock")
102        self.list_con.acquire()
103        try:
104            if device.device_allocation_state == \
105                    DeviceAllocationState.allocated:
106                device.device_allocation_state = \
107                    DeviceAllocationState.available
108            LOG.debug("Free device sn: %s, type: %s" % (
109                device.__get_serial__(), device.__class__))
110        finally:
111            LOG.debug("Lite release device: release lock")
112            self.list_con.release()
113
114    def reset_device(self, device):
115        pass
116
117    def list_devices(self):
118        print("Lite devices:")
119        print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}{6:<16}".
120              format("SerialPort/IP", "Baudrate/Port", "OsType", "Allocation",
121                     "Product", "ConnectType", "ComType"))
122        for device in self.devices_list:
123            if device.device_connect_type == "remote" or \
124                    device.device_connect_type == "agent":
125                print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}".format(
126                    convert_ip(device.device.host),
127                    convert_port(device.device.port),
128                    device.device_os_type,
129                    device.device_allocation_state,
130                    device.label,
131                    device.device_connect_type))
132            else:
133                for com_controller in device.device.com_dict:
134                    print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}{6:<16}".
135                          format(convert_port(device.device.com_dict[
136                                     com_controller].serial_port),
137                                 device.device.com_dict[
138                                     com_controller].baud_rate,
139                                 device.device_os_type,
140                                 device.device_allocation_state,
141                                 device.label,
142                                 device.device_connect_type,
143                                 com_controller))
144