• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import re
20import telnetlib
21import time
22import os
23import threading
24
25from xdevice import DeviceOsType
26from xdevice import ConfigConst
27from xdevice import DeviceLabelType
28from xdevice import ModeType
29from xdevice import IDevice
30from xdevice import platform_logger
31from xdevice import DeviceAllocationState
32from xdevice import Plugin
33from xdevice import exec_cmd
34from xdevice import convert_serial
35from xdevice import convert_ip
36from xdevice import check_mode
37
38from ohos.exception import LiteDeviceConnectError
39from ohos.exception import LiteDeviceTimeout
40from ohos.exception import LiteParamError
41from ohos.environment.dmlib_lite import LiteHelper
42from ohos.constants import ComType
43
44LOG = platform_logger("DeviceLite")
45TIMEOUT = 90
46RETRY_ATTEMPTS = 0
47HDC = "litehdc.exe"
48DEFAULT_BAUD_RATE = 115200
49
50
51def get_hdc_path():
52    from xdevice import Variables
53    user_path = os.path.join(Variables.exec_dir, "resource/tools")
54    top_user_path = os.path.join(Variables.top_dir, "config")
55    config_path = os.path.join(Variables.res_dir, "config")
56    paths = [user_path, top_user_path, config_path]
57
58    file_path = ""
59    for path in paths:
60        if os.path.exists(os.path.abspath(os.path.join(
61                path, HDC))):
62            file_path = os.path.abspath(os.path.join(
63                path, HDC))
64            break
65
66    if os.path.exists(file_path):
67        return file_path
68    else:
69        raise LiteParamError("litehdc.exe not found", error_no="00108")
70
71
72def parse_available_com(com_str):
73    com_str = com_str.replace("\r", " ")
74    com_list = com_str.split("\n")
75    for index, item in enumerate(com_list):
76        com_list[index] = item.strip().strip(b'\x00'.decode())
77    return com_list
78
79
80def perform_device_action(func):
81    def device_action(self, *args, **kwargs):
82        if not self.get_recover_state():
83            LOG.debug("Device %s %s is false" % (self.device_sn,
84                                                 ConfigConst.recover_state))
85            return "", "", ""
86
87        tmp = int(kwargs.get("retry", RETRY_ATTEMPTS))
88        retry = tmp + 1 if tmp > 0 else 1
89        exception = None
90        for num in range(retry):
91            try:
92                result = func(self, *args, **kwargs)
93                return result
94            except LiteDeviceTimeout as error:
95                LOG.error(error)
96                exception = error
97                if num:
98                    self.recover_device()
99            except Exception as error:
100                LOG.error(error)
101                exception = error
102        raise exception
103
104    return device_action
105
106
107@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite)
108class DeviceLite(IDevice):
109    """
110    Class representing an device lite device.
111
112    Each object of this class represents one device lite device in xDevice.
113
114    Attributes:
115        device_connect_type: A string that's the type of lite device
116    """
117    device_os_type = DeviceOsType.lite
118    device_allocation_state = DeviceAllocationState.available
119
120    def __init__(self):
121        self.device_sn = ""
122        self.label = ""
123        self.device_connect_type = ""
124        self.device_kernel = ""
125        self.device = None
126        self.ifconfig = None
127        self.extend_value = {}
128        self.device_lock = threading.RLock()
129
130    def __set_serial__(self, device=None):
131        for item in device:
132            if "ip" in item.keys() and "port" in item.keys():
133                self.device_sn = "remote_%s_%s" % \
134                                 (item.get("ip"), item.get("port"))
135                break
136            elif "type" in item.keys() and "com" in item.keys():
137                self.device_sn = "local_%s" % item.get("com")
138                break
139
140    def __get_serial__(self):
141        return self.device_sn
142
143    def get(self, key=None, default=None):
144        if not key:
145            return default
146        value = getattr(self, key, None)
147        if value:
148            return value
149        else:
150            return self.extend_value.get(key, default)
151
152    def __set_device_kernel__(self, kernel_type=""):
153        self.device_kernel = kernel_type
154
155    def __get_device_kernel__(self):
156        return self.device_kernel
157
158    @staticmethod
159    def _check_watchgt(device):
160        for item in device:
161            if "label" not in item.keys():
162                error_message = "watchGT local label does not exist"
163                raise LiteParamError(error_message, error_no="00108")
164            if "com" not in item.keys() or ("com" in item.keys() and
165                                            not item.get("com")):
166                error_message = "watchGT local com cannot be " \
167                                "empty, please check"
168                raise LiteParamError(error_message, error_no="00108")
169            else:
170                hdc = get_hdc_path()
171                result = exec_cmd([hdc])
172                com_list = parse_available_com(result)
173                if item.get("com").upper() in com_list:
174                    return True
175                else:
176                    error_message = "watchGT local com does not exist"
177                    raise LiteParamError(error_message, error_no="00108")
178
179    @staticmethod
180    def _check_wifiiot_config(device):
181        com_type_set = set()
182        for item in device:
183            if "label" not in item.keys():
184                if "com" not in item.keys() or ("com" in item.keys() and
185                                                not item.get("com")):
186                    error_message = "wifiiot local com cannot be " \
187                                    "empty, please check"
188                    raise LiteParamError(error_message, error_no="00108")
189
190                if "type" not in item.keys() or ("type" not in item.keys() and
191                                                 not item.get("type")):
192                    error_message = "wifiiot com type cannot be " \
193                                    "empty, please check"
194                    raise LiteParamError(error_message, error_no="00108")
195                else:
196                    com_type_set.add(item.get("type"))
197        if len(com_type_set) < 2:
198            error_message = "wifiiot need cmd com and deploy com" \
199                            " at the same time, please check"
200            raise LiteParamError(error_message, error_no="00108")
201
202    @staticmethod
203    def _check_ipcamera_local(device):
204        for item in device:
205            if "label" not in item.keys():
206                if "com" not in item.keys() or ("com" in item.keys() and
207                                                not item.get("com")):
208                    error_message = "ipcamera local com cannot be " \
209                                    "empty, please check"
210                    raise LiteParamError(error_message, error_no="00108")
211
212    @staticmethod
213    def _check_ipcamera_remote(device=None):
214        for item in device:
215            if "label" not in item.keys():
216                if "port" in item.keys() and item.get("port") and not item.get(
217                        "port").isnumeric():
218                    error_message = "ipcamera remote port should be " \
219                                    "a number, please check"
220                    raise LiteParamError(error_message, error_no="00108")
221                elif "port" not in item.keys():
222                    error_message = "ipcamera remote port cannot be" \
223                                    " empty, please check"
224                    raise LiteParamError(error_message, error_no="00108")
225
226
227    def __check_config__(self, device=None):
228        self.set_connect_type(device)
229        if self.label == DeviceLabelType.wifiiot:
230            self._check_wifiiot_config(device)
231        elif self.label == DeviceLabelType.ipcamera and \
232                self.device_connect_type == "local":
233            self._check_ipcamera_local(device)
234        elif self.label == DeviceLabelType.ipcamera and \
235                self.device_connect_type == "remote":
236            self._check_ipcamera_remote(device)
237        elif self.label == DeviceLabelType.watch_gt:
238            self._check_watchgt(device)
239
240    def set_connect_type(self, device):
241        pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \
242                  r'01]?\d\d?)$'
243        for item in device:
244            if "label" in item.keys():
245                self.label = item.get("label")
246            if "com" in item.keys():
247                self.device_connect_type = "local"
248            if "ip" in item.keys():
249                if re.match(pattern, item.get("ip")):
250                    self.device_connect_type = "remote"
251                else:
252                    error_message = "Remote device ip not in right" \
253                                         "format, please check user_config.xml"
254                    raise LiteParamError(error_message, error_no="00108")
255        if not self.label:
256            error_message = "device label cannot be empty, " \
257                            "please check"
258            raise LiteParamError(error_message, error_no="00108")
259        else:
260            if self.label != DeviceLabelType.wifiiot and \
261                    self.label != DeviceLabelType.ipcamera and \
262                    self.label != DeviceLabelType.watch_gt:
263                error_message = "device label should be ipcamera or" \
264                                     " wifiiot, please check"
265                raise LiteParamError(error_message, error_no="00108")
266        if not self.device_connect_type:
267            error_message = "device com or ip cannot be empty, " \
268                                 "please check"
269            raise LiteParamError(error_message, error_no="00108")
270
271    def __init_device__(self, device):
272        self.__check_config__(device)
273        self.__set_serial__(device)
274        if self.device_connect_type == "remote":
275            self.device = CONTROLLER_DICT.get("remote")(device)
276        else:
277            self.device = CONTROLLER_DICT.get("local")(device)
278
279        self.ifconfig = device[1].get("ifconfig")
280
281    def connect(self):
282        """
283        Connect the device
284
285        """
286        try:
287            self.device.connect()
288        except LiteDeviceConnectError as _:
289            if check_mode(ModeType.decc):
290                LOG.debug("Set device %s recover state to false" %
291                          self.device_sn)
292                self.device_allocation_state = DeviceAllocationState.unusable
293                self.set_recover_state(False)
294            raise
295
296    @perform_device_action
297    def execute_command_with_timeout(self, command="", case_type="",
298                                     timeout=TIMEOUT, **kwargs):
299        """Executes command on the device.
300
301        Args:
302            command: the command to execute
303            case_type: CTest or CppTest
304            timeout: timeout for read result
305            **kwargs: receiver - parser handler input
306
307        Returns:
308            (filter_result, status, error_message)
309
310            filter_result: command execution result
311            status: true or false
312            error_message: command execution error message
313        """
314        receiver = kwargs.get("receiver", None)
315        if self.device_connect_type == "remote":
316            LOG.info("%s execute command shell %s with timeout %ss" %
317                     (convert_serial(self.__get_serial__()), command,
318                      str(timeout)))
319            filter_result, status, error_message = \
320                self.device.execute_command_with_timeout(
321                    command=command,
322                    timeout=timeout,
323                    receiver=receiver)
324        elif self.device_connect_type == "agent":
325            filter_result, status, error_message = \
326                self.device.execute_command_with_timeout(
327                    command=command,
328                    case_type=case_type,
329                    timeout=timeout,
330                    receiver=receiver, type="cmd")
331        else:
332            filter_result, status, error_message = \
333                self.device.execute_command_with_timeout(
334                    command=command,
335                    case_type=case_type,
336                    timeout=timeout,
337                    receiver=receiver)
338        if not receiver:
339            LOG.debug("%s execute result:%s" % (
340                convert_serial(self.__get_serial__()), filter_result))
341        if not status:
342            LOG.debug(
343                "%s error_message:%s" % (convert_serial(self.__get_serial__()),
344                                         error_message))
345        return filter_result, status, error_message
346
347    def recover_device(self):
348        self.reboot()
349
350    def reboot(self):
351        self.connect()
352        filter_result, status, error_message = self. \
353            execute_command_with_timeout(command="reset", timeout=30)
354        if not filter_result:
355            if check_mode(ModeType.decc):
356                LOG.debug("Set device %s recover state to false" %
357                          self.device_sn)
358                self.device_allocation_state = DeviceAllocationState.unusable
359                self.set_recover_state(False)
360        if self.ifconfig:
361            enter_result, _, _ = self.execute_command_with_timeout(command='\r',
362                                                                   timeout=15)
363            if " #" in enter_result or "OHOS #" in enter_result:
364                LOG.info("Reset device %s success" % self.device_sn)
365                self.execute_command_with_timeout(command=self.ifconfig,
366                                                  timeout=5)
367            elif "hisilicon #" in enter_result:
368                LOG.info("Reset device %s fail" % self.device_sn)
369
370            ifconfig_result, _, _ = self.execute_command_with_timeout(
371                command="ifconfig",
372                timeout=5)
373
374    def close(self):
375        """
376        Close the telnet connection with device server or close the local
377        serial
378        """
379        self.device.close()
380
381    def set_recover_state(self, state):
382        with self.device_lock:
383            setattr(self, ConfigConst.recover_state, state)
384
385    def get_recover_state(self, default_state=True):
386        with self.device_lock:
387            state = getattr(self, ConfigConst.recover_state, default_state)
388            return state
389
390
391class RemoteController:
392    """
393    Class representing an device lite remote device.
394    Each object of this class represents one device lite remote device
395    in xDevice.
396    """
397
398    def __init__(self, device):
399        self.host = device[1].get("ip")
400        self.port = int(device[1].get("port"))
401        self.telnet = None
402
403    def connect(self):
404        """
405        Connect the device server
406
407        """
408        try:
409            if self.telnet:
410                return self.telnet
411            self.telnet = telnetlib.Telnet(self.host, self.port,
412                                           timeout=TIMEOUT)
413        except Exception as err_msgs:
414            error_message = "Connect remote lite device failed, host is %s, " \
415                            "port is %s, error is %s" % \
416                            (convert_ip(self.host), self.port, str(err_msgs))
417            raise LiteDeviceConnectError(error_message, error_no="00401")
418        time.sleep(2)
419        self.telnet.set_debuglevel(0)
420        return self.telnet
421
422    def execute_command_with_timeout(self, command="", timeout=TIMEOUT,
423                                     receiver=None):
424        """
425        Executes command on the device.
426
427        Parameters:
428            command: the command to execute
429            timeout: timeout for read result
430            receiver: parser handler
431        """
432        return LiteHelper.execute_remote_cmd_with_timeout(
433            self.telnet, command, timeout, receiver)
434
435    def close(self):
436        """
437        Close the telnet connection with device server
438        """
439        try:
440            if not self.telnet:
441                return
442            self.telnet.close()
443            self.telnet = None
444        except (ConnectionError, Exception) as _:
445            error_message = "Remote device is disconnected abnormally"
446            LOG.error(error_message, error_no="00401")
447
448
449class LocalController:
450    def __init__(self, device):
451        """
452        Init Local device.
453        Parameters:
454            device: local device
455        """
456        self.com_dict = {}
457        for item in device:
458            if "com" in item.keys():
459                if "type" in item.keys() and ComType.cmd_com == item.get(
460                        "type"):
461                    self.com_dict[ComType.cmd_com] = ComController(item)
462                elif "type" in item.keys() and ComType.deploy_com == item.get(
463                        "type"):
464                    self.com_dict[ComType.deploy_com] = ComController(item)
465
466    def connect(self, key=ComType.cmd_com):
467        """
468        Open serial.
469        """
470        self.com_dict.get(key).connect()
471
472    def close(self, key=ComType.cmd_com):
473        """
474        Close serial.
475        """
476        if self.com_dict and self.com_dict.get(key):
477            self.com_dict.get(key).close()
478
479    def execute_command_with_timeout(self, **kwargs):
480        """
481        Execute command on the serial and read all the output from the serial.
482        """
483        args = kwargs
484        key = args.get("key", ComType.cmd_com)
485        command = args.get("command", None)
486        case_type = args.get("case_type", "")
487        receiver = args.get("receiver", None)
488        timeout = args.get("timeout", TIMEOUT)
489        return self.com_dict.get(key).execute_command_with_timeout(
490            command=command, case_type=case_type,
491            timeout=timeout, receiver=receiver)
492
493
494class ComController:
495    def __init__(self, device):
496        """
497        Init serial.
498        Parameters:
499            device: local com
500        """
501        self.is_open = False
502        self.com = None
503        self.serial_port = device.get("com", None)
504        self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE))
505        self.timeout = int(device.get("timeout", TIMEOUT))
506        self.usb_port = device.get("usb_port", None)
507
508    def connect(self):
509        """
510        Open serial.
511        """
512        try:
513            if not self.is_open:
514                import serial
515                self.com = serial.Serial(self.serial_port,
516                                         baudrate=self.baud_rate,
517                                         timeout=self.timeout)
518                self.is_open = True
519        except Exception as error_msg:
520            error = "connect %s serial failed, please make sure this port is" \
521                    " not occupied, error is %s[00401]" % \
522                   (self.serial_port, str(error_msg))
523            raise LiteDeviceConnectError(error, error_no="00401")
524
525    def close(self):
526        """
527        Close serial.
528        """
529        try:
530            if not self.com:
531                return
532            if self.is_open:
533                self.com.close()
534            self.is_open = False
535        except (ConnectionError, Exception) as _:
536            error_message = "Local device is disconnected abnormally"
537            LOG.error(error_message, error_no="00401")
538
539    def execute_command_with_timeout(self, **kwargs):
540        """
541        Execute command on the serial and read all the output from the serial.
542        """
543        return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs)
544
545    def execute_command(self, command):
546        """
547        Execute command on the serial and read all the output from the serial.
548        """
549        LiteHelper.execute_local_command(self.com, command)
550
551
552CONTROLLER_DICT = {
553    "local": LocalController,
554    "remote": RemoteController,
555}
556