• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import re
20import telnetlib
21import time
22import os
23import threading
24
25from xdevice import DeviceOsType
26from xdevice import DeviceProperties
27from xdevice import ConfigConst
28from xdevice import DeviceLabelType
29from xdevice import ModeType
30from xdevice import IDevice
31from xdevice import platform_logger
32from xdevice import DeviceAllocationState
33from xdevice import Plugin
34from xdevice import exec_cmd
35from xdevice import convert_serial
36from xdevice import convert_ip
37from xdevice import check_mode
38
39from ohos.exception import LiteDeviceConnectError
40from ohos.exception import LiteDeviceTimeout
41from ohos.exception import LiteParamError
42from ohos.environment.dmlib_lite import LiteHelper
43from ohos.constants import ComType
44
45LOG = platform_logger("DeviceLite")
46TIMEOUT = 90
47RETRY_ATTEMPTS = 0
48HDC = "litehdc.exe"
49DEFAULT_BAUD_RATE = 115200
50
51
52def get_hdc_path():
53    from xdevice import Variables
54    user_path = os.path.join(Variables.exec_dir, "resource/tools")
55    top_user_path = os.path.join(Variables.top_dir, "config")
56    config_path = os.path.join(Variables.res_dir, "config")
57    paths = [user_path, top_user_path, config_path]
58
59    file_path = ""
60    for path in paths:
61        if os.path.exists(os.path.abspath(os.path.join(
62                path, HDC))):
63            file_path = os.path.abspath(os.path.join(
64                path, HDC))
65            break
66
67    if os.path.exists(file_path):
68        return file_path
69    else:
70        raise LiteParamError("litehdc.exe not found", error_no="00108")
71
72
73def parse_available_com(com_str):
74    com_str = com_str.replace("\r", " ")
75    com_list = com_str.split("\n")
76    for index, item in enumerate(com_list):
77        com_list[index] = item.strip().strip(b'\x00'.decode())
78    return com_list
79
80
81def perform_device_action(func):
82    def device_action(self, *args, **kwargs):
83        if not self.get_recover_state():
84            LOG.debug("Device %s %s is false" % (self.device_sn,
85                                                 ConfigConst.recover_state))
86            return "", "", ""
87
88        tmp = int(kwargs.get("retry", RETRY_ATTEMPTS))
89        retry = tmp + 1 if tmp > 0 else 1
90        exception = None
91        for num in range(retry):
92            try:
93                result = func(self, *args, **kwargs)
94                return result
95            except LiteDeviceTimeout as error:
96                LOG.error(error)
97                exception = error
98                if num:
99                    self.recover_device()
100            except Exception as error:
101                LOG.error(error)
102                exception = error
103        raise exception
104
105    return device_action
106
107
108@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite)
109class DeviceLite(IDevice):
110    """
111    Class representing a device lite device.
112
113    Each object of this class represents one device lite device in xDevice.
114
115    Attributes:
116        device_connect_type: A string that's the type of lite device
117    """
118    device_os_type = DeviceOsType.lite
119    device_allocation_state = DeviceAllocationState.available
120
121    def __init__(self):
122        self.device_sn = ""
123        self.label = ""
124        self.device_connect_type = ""
125        self.device_kernel = ""
126        self.device = None
127        self.ifconfig = None
128        self.device_id = None
129        self.extend_value = {}
130        self.device_lock = threading.RLock()
131        self.device_props = {}
132        self.device_description = {}
133
134    def init_description(self):
135        if self.device_description:
136            return
137        desc = {
138            DeviceProperties.sn: convert_serial(self.device_sn),
139            DeviceProperties.model: self.label,
140            DeviceProperties.type_: self.label,
141            DeviceProperties.platform: "OpenHarmony",
142            DeviceProperties.version: "",
143            DeviceProperties.others: self.device_props
144        }
145        self.device_description.update(desc)
146
147    def __set_serial__(self, device=None):
148        for item in device:
149            if "ip" in item.keys() and "port" in item.keys():
150                self.device_sn = "remote_%s_%s" % \
151                                 (item.get("ip"), item.get("port"))
152                break
153            elif "type" in item.keys() and "com" in item.keys():
154                self.device_sn = "local_%s" % item.get("com")
155                break
156
157    def __get_serial__(self):
158        return self.device_sn
159
160    def get(self, key=None, default=None):
161        if not key:
162            return default
163        value = getattr(self, key, None)
164        if value:
165            return value
166        else:
167            return self.extend_value.get(key, default)
168
169    def update_device_props(self, props):
170        if self.device_props or not isinstance(props, dict):
171            return
172        self.device_props.update(props)
173
174    def __set_device_kernel__(self, kernel_type=""):
175        self.device_kernel = kernel_type
176
177    def __get_device_kernel__(self):
178        return self.device_kernel
179
180    @staticmethod
181    def _check_watchgt(device):
182        for item in device:
183            if "label" not in item.keys():
184                error_message = "watchGT local label does not exist"
185                raise LiteParamError(error_message, error_no="00108")
186            if "com" not in item.keys() or ("com" in item.keys() and
187                                            not item.get("com")):
188                error_message = "watchGT local com cannot be " \
189                                "empty, please check"
190                raise LiteParamError(error_message, error_no="00108")
191            else:
192                hdc = get_hdc_path()
193                result = exec_cmd([hdc])
194                com_list = parse_available_com(result)
195                if item.get("com").upper() in com_list:
196                    return True
197                else:
198                    error_message = "watchGT local com does not exist"
199                    raise LiteParamError(error_message, error_no="00108")
200
201    @staticmethod
202    def _check_wifiiot_config(device):
203        com_type_set = set()
204        for item in device:
205            if "label" not in item.keys():
206                if "com" not in item.keys() or ("com" in item.keys() and
207                                                not item.get("com")):
208                    error_message = "wifiiot local com cannot be " \
209                                    "empty, please check"
210                    raise LiteParamError(error_message, error_no="00108")
211
212                if "type" not in item.keys() or ("type" not in item.keys() and
213                                                 not item.get("type")):
214                    error_message = "wifiiot com type cannot be " \
215                                    "empty, please check"
216                    raise LiteParamError(error_message, error_no="00108")
217                else:
218                    com_type_set.add(item.get("type"))
219        if len(com_type_set) < 2:
220            error_message = "wifiiot need cmd com and deploy com" \
221                            " at the same time, please check"
222            raise LiteParamError(error_message, error_no="00108")
223
224    @staticmethod
225    def _check_ipcamera_local(device):
226        for item in device:
227            if "label" not in item.keys():
228                if "com" not in item.keys() or ("com" in item.keys() and
229                                                not item.get("com")):
230                    error_message = "ipcamera local com cannot be " \
231                                    "empty, please check"
232                    raise LiteParamError(error_message, error_no="00108")
233
234    @staticmethod
235    def _check_ipcamera_remote(device=None):
236        for item in device:
237            if "label" not in item.keys():
238                if "port" in item.keys() and item.get("port") and not item.get(
239                        "port").isnumeric():
240                    error_message = "ipcamera remote port should be " \
241                                    "a number, please check"
242                    raise LiteParamError(error_message, error_no="00108")
243                elif "port" not in item.keys():
244                    error_message = "ipcamera remote port cannot be" \
245                                    " empty, please check"
246                    raise LiteParamError(error_message, error_no="00108")
247
248    def __check_config__(self, device=None):
249        self.set_connect_type(device)
250        if self.label == DeviceLabelType.wifiiot:
251            self._check_wifiiot_config(device)
252        elif self.label == DeviceLabelType.ipcamera and \
253                self.device_connect_type == "local":
254            self._check_ipcamera_local(device)
255        elif self.label == DeviceLabelType.ipcamera and \
256                self.device_connect_type == "remote":
257            self._check_ipcamera_remote(device)
258        elif self.label == DeviceLabelType.watch_gt:
259            self._check_watchgt(device)
260
261    def set_connect_type(self, device):
262        pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \
263                  r'01]?\d\d?)$'
264        for item in device:
265            if "label" in item.keys():
266                self.label = item.get("label")
267            if "com" in item.keys():
268                self.device_connect_type = "local"
269            if "ip" in item.keys():
270                if re.match(pattern, item.get("ip")):
271                    self.device_connect_type = "remote"
272                else:
273                    error_message = "Remote device ip not in right" \
274                                         "format, please check user_config.xml"
275                    raise LiteParamError(error_message, error_no="00108")
276        if not self.label:
277            error_message = "device label cannot be empty, " \
278                            "please check"
279            raise LiteParamError(error_message, error_no="00108")
280        else:
281            if self.label != DeviceLabelType.wifiiot and \
282                    self.label != DeviceLabelType.ipcamera and \
283                    self.label != DeviceLabelType.watch_gt:
284                error_message = "device label should be ipcamera or" \
285                                     " wifiiot, please check"
286                raise LiteParamError(error_message, error_no="00108")
287        if not self.device_connect_type:
288            error_message = "device com or ip cannot be empty, " \
289                                 "please check"
290            raise LiteParamError(error_message, error_no="00108")
291
292    def __init_device__(self, device):
293        self.__check_config__(device)
294        self.__set_serial__(device)
295        if self.device_connect_type == "remote":
296            self.device = CONTROLLER_DICT.get("remote")(device)
297        else:
298            self.device = CONTROLLER_DICT.get("local")(device)
299
300        self.ifconfig = device[1].get("ifconfig")
301
302    def connect(self):
303        """
304        Connect the device
305
306        """
307        try:
308            self.device.connect()
309        except LiteDeviceConnectError as _:
310            if check_mode(ModeType.decc):
311                LOG.debug("Set device %s recover state to false" %
312                          self.device_sn)
313                self.device_allocation_state = DeviceAllocationState.unusable
314                self.set_recover_state(False)
315            raise
316
317    @perform_device_action
318    def execute_command_with_timeout(self, command="", case_type="",
319                                     timeout=TIMEOUT, **kwargs):
320        """Executes command on the device.
321
322        Args:
323            command: the command to execute
324            case_type: CTest or CppTest
325            timeout: timeout for read result
326            **kwargs: receiver - parser handler input
327
328        Returns:
329            (filter_result, status, error_message)
330
331            filter_result: command execution result
332            status: true or false
333            error_message: command execution error message
334        """
335        receiver = kwargs.get("receiver", None)
336        if self.device_connect_type == "remote":
337            LOG.info("%s execute command shell %s with timeout %ss" %
338                     (convert_serial(self.__get_serial__()), command,
339                      str(timeout)))
340            filter_result, status, error_message = \
341                self.device.execute_command_with_timeout(
342                    command=command,
343                    timeout=timeout,
344                    receiver=receiver)
345        elif self.device_connect_type == "agent":
346            filter_result, status, error_message = \
347                self.device.execute_command_with_timeout(
348                    command=command,
349                    case_type=case_type,
350                    timeout=timeout,
351                    receiver=receiver, type="cmd")
352        else:
353            filter_result, status, error_message = \
354                self.device.execute_command_with_timeout(
355                    command=command,
356                    case_type=case_type,
357                    timeout=timeout,
358                    receiver=receiver)
359        if not receiver:
360            LOG.debug("%s execute result:%s" % (
361                convert_serial(self.__get_serial__()), filter_result))
362        if not status:
363            LOG.debug(
364                "%s error_message:%s" % (convert_serial(self.__get_serial__()),
365                                         error_message))
366        return filter_result, status, error_message
367
368    def recover_device(self):
369        self.reboot()
370
371    def reboot(self):
372        self.connect()
373        filter_result, status, error_message = self. \
374            execute_command_with_timeout(command="reset", timeout=30)
375        if not filter_result:
376            if check_mode(ModeType.decc):
377                LOG.debug("Set device %s recover state to false" %
378                          self.device_sn)
379                self.device_allocation_state = DeviceAllocationState.unusable
380                self.set_recover_state(False)
381        if self.ifconfig:
382            enter_result, _, _ = self.execute_command_with_timeout(command='\r',
383                                                                   timeout=15)
384            if " #" in enter_result or "OHOS #" in enter_result:
385                LOG.info("Reset device %s success" % self.device_sn)
386                self.execute_command_with_timeout(command=self.ifconfig,
387                                                  timeout=5)
388            elif "hisilicon #" in enter_result:
389                LOG.info("Reset device %s fail" % self.device_sn)
390
391            ifconfig_result, _, _ = self.execute_command_with_timeout(
392                command="ifconfig",
393                timeout=5)
394
395    def close(self):
396        """
397        Close the telnet connection with device server or close the local
398        serial
399        """
400        self.device.close()
401
402    def set_recover_state(self, state):
403        with self.device_lock:
404            setattr(self, ConfigConst.recover_state, state)
405
406    def get_recover_state(self, default_state=True):
407        with self.device_lock:
408            state = getattr(self, ConfigConst.recover_state, default_state)
409            return state
410
411
412class RemoteController:
413    """
414    Class representing an device lite remote device.
415    Each object of this class represents one device lite remote device
416    in xDevice.
417    """
418
419    def __init__(self, device):
420        self.host = device[1].get("ip")
421        self.port = int(device[1].get("port"))
422        self.telnet = None
423
424    def connect(self):
425        """
426        Connect the device server
427
428        """
429        try:
430            if self.telnet:
431                return self.telnet
432            self.telnet = telnetlib.Telnet(self.host, self.port,
433                                           timeout=TIMEOUT)
434        except Exception as err_msgs:
435            error_message = "Connect remote lite device failed, host is %s, " \
436                            "port is %s, error is %s" % \
437                            (convert_ip(self.host), self.port, str(err_msgs))
438            raise LiteDeviceConnectError(error_message, error_no="00401")
439        time.sleep(2)
440        self.telnet.set_debuglevel(0)
441        return self.telnet
442
443    def execute_command_with_timeout(self, command="", timeout=TIMEOUT,
444                                     receiver=None):
445        """
446        Executes command on the device.
447
448        Parameters:
449            command: the command to execute
450            timeout: timeout for read result
451            receiver: parser handler
452        """
453        return LiteHelper.execute_remote_cmd_with_timeout(
454            self.telnet, command, timeout, receiver)
455
456    def close(self):
457        """
458        Close the telnet connection with device server
459        """
460        try:
461            if not self.telnet:
462                return
463            self.telnet.close()
464            self.telnet = None
465        except (ConnectionError, Exception) as _:
466            error_message = "Remote device is disconnected abnormally"
467            LOG.error(error_message, error_no="00401")
468
469
470class LocalController:
471    def __init__(self, device):
472        """
473        Init Local device.
474        Parameters:
475            device: local device
476        """
477        self.com_dict = {}
478        for item in device:
479            if "com" in item.keys():
480                if "type" in item.keys() and ComType.cmd_com == item.get(
481                        "type"):
482                    self.com_dict[ComType.cmd_com] = ComController(item)
483                elif "type" in item.keys() and ComType.deploy_com == item.get(
484                        "type"):
485                    self.com_dict[ComType.deploy_com] = ComController(item)
486
487    def connect(self, key=ComType.cmd_com):
488        """
489        Open serial.
490        """
491        self.com_dict.get(key).connect()
492
493    def close(self, key=ComType.cmd_com):
494        """
495        Close serial.
496        """
497        if self.com_dict and self.com_dict.get(key):
498            self.com_dict.get(key).close()
499
500    def execute_command_with_timeout(self, **kwargs):
501        """
502        Execute command on the serial and read all the output from the serial.
503        """
504        args = kwargs
505        key = args.get("key", ComType.cmd_com)
506        command = args.get("command", None)
507        case_type = args.get("case_type", "")
508        receiver = args.get("receiver", None)
509        timeout = args.get("timeout", TIMEOUT)
510        return self.com_dict.get(key).execute_command_with_timeout(
511            command=command, case_type=case_type,
512            timeout=timeout, receiver=receiver)
513
514
515class ComController:
516    def __init__(self, device):
517        """
518        Init serial.
519        Parameters:
520            device: local com
521        """
522        self.is_open = False
523        self.com = None
524        self.serial_port = device.get("com", None)
525        self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE))
526        self.timeout = int(device.get("timeout", TIMEOUT))
527        self.usb_port = device.get("usb_port", None)
528
529    def connect(self):
530        """
531        Open serial.
532        """
533        try:
534            if not self.is_open:
535                import serial
536                self.com = serial.Serial(self.serial_port,
537                                         baudrate=self.baud_rate,
538                                         timeout=self.timeout)
539                self.is_open = True
540        except Exception as error_msg:
541            error = "connect %s serial failed, please make sure this port is" \
542                    " not occupied, error is %s[00401]" % \
543                   (self.serial_port, str(error_msg))
544            raise LiteDeviceConnectError(error, error_no="00401")
545
546    def close(self):
547        """
548        Close serial.
549        """
550        try:
551            if not self.com:
552                return
553            if self.is_open:
554                self.com.close()
555            self.is_open = False
556        except (ConnectionError, Exception) as _:
557            error_message = "Local device is disconnected abnormally"
558            LOG.error(error_message, error_no="00401")
559
560    def execute_command_with_timeout(self, **kwargs):
561        """
562        Execute command on the serial and read all the output from the serial.
563        """
564        return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs)
565
566    def execute_command(self, command):
567        """
568        Execute command on the serial and read all the output from the serial.
569        """
570        LiteHelper.execute_local_command(self.com, command)
571
572
573CONTROLLER_DICT = {
574    "local": LocalController,
575    "remote": RemoteController,
576}
577