• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import re
20import telnetlib
21import time
22import os
23import threading
24
25from xdevice import DeviceOsType
26from xdevice import ConfigConst
27from xdevice import DeviceLabelType
28from xdevice import ModeType
29from xdevice import IDevice
30from xdevice import platform_logger
31from xdevice import DeviceAllocationState
32from xdevice import Plugin
33from xdevice import exec_cmd
34from xdevice import convert_serial
35from xdevice import convert_ip
36from xdevice import check_mode
37
38from ohos.exception import LiteDeviceConnectError
39from ohos.exception import LiteDeviceTimeout
40from ohos.exception import LiteParamError
41from ohos.environment.dmlib_lite import LiteHelper
42from ohos.constants import ComType
43
44LOG = platform_logger("DeviceLite")
45TIMEOUT = 90
46RETRY_ATTEMPTS = 0
47HDC = "litehdc.exe"
48DEFAULT_BAUD_RATE = 115200
49
50
51def get_hdc_path():
52    from xdevice import Variables
53    user_path = os.path.join(Variables.exec_dir, "resource/tools")
54    top_user_path = os.path.join(Variables.top_dir, "config")
55    config_path = os.path.join(Variables.res_dir, "config")
56    paths = [user_path, top_user_path, config_path]
57
58    file_path = ""
59    for path in paths:
60        if os.path.exists(os.path.abspath(os.path.join(
61                path, HDC))):
62            file_path = os.path.abspath(os.path.join(
63                path, HDC))
64            break
65
66    if os.path.exists(file_path):
67        return file_path
68    else:
69        raise LiteParamError("litehdc.exe not found", error_no="00108")
70
71
72def parse_available_com(com_str):
73    com_str = com_str.replace("\r", " ")
74    com_list = com_str.split("\n")
75    for index, item in enumerate(com_list):
76        com_list[index] = item.strip().strip(b'\x00'.decode())
77    return com_list
78
79
80def perform_device_action(func):
81    def device_action(self, *args, **kwargs):
82        if not self.get_recover_state():
83            LOG.debug("Device %s %s is false" % (self.device_sn,
84                                                 ConfigConst.recover_state))
85            return "", "", ""
86
87        tmp = int(kwargs.get("retry", RETRY_ATTEMPTS))
88        retry = tmp + 1 if tmp > 0 else 1
89        exception = None
90        for num in range(retry):
91            try:
92                result = func(self, *args, **kwargs)
93                return result
94            except LiteDeviceTimeout as error:
95                LOG.error(error)
96                exception = error
97                if num:
98                    self.recover_device()
99            except Exception as error:
100                LOG.error(error)
101                exception = error
102        raise exception
103
104    return device_action
105
106
107@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite)
108class DeviceLite(IDevice):
109    """
110    Class representing an device lite device.
111
112    Each object of this class represents one device lite device in xDevice.
113
114    Attributes:
115        device_connect_type: A string that's the type of lite device
116    """
117    device_os_type = DeviceOsType.lite
118    device_allocation_state = DeviceAllocationState.available
119
120    def __init__(self):
121        self.device_sn = ""
122        self.label = ""
123        self.device_connect_type = ""
124        self.device_kernel = ""
125        self.device = None
126        self.ifconfig = None
127        self.device_id = None
128        self.extend_value = {}
129        self.device_lock = threading.RLock()
130
131    def __set_serial__(self, device=None):
132        for item in device:
133            if "ip" in item.keys() and "port" in item.keys():
134                self.device_sn = "remote_%s_%s" % \
135                                 (item.get("ip"), item.get("port"))
136                break
137            elif "type" in item.keys() and "com" in item.keys():
138                self.device_sn = "local_%s" % item.get("com")
139                break
140
141    def __get_serial__(self):
142        return self.device_sn
143
144    def get(self, key=None, default=None):
145        if not key:
146            return default
147        value = getattr(self, key, None)
148        if value:
149            return value
150        else:
151            return self.extend_value.get(key, default)
152
153    def __set_device_kernel__(self, kernel_type=""):
154        self.device_kernel = kernel_type
155
156    def __get_device_kernel__(self):
157        return self.device_kernel
158
159    @staticmethod
160    def _check_watchgt(device):
161        for item in device:
162            if "label" not in item.keys():
163                error_message = "watchGT local label does not exist"
164                raise LiteParamError(error_message, error_no="00108")
165            if "com" not in item.keys() or ("com" in item.keys() and
166                                            not item.get("com")):
167                error_message = "watchGT local com cannot be " \
168                                "empty, please check"
169                raise LiteParamError(error_message, error_no="00108")
170            else:
171                hdc = get_hdc_path()
172                result = exec_cmd([hdc])
173                com_list = parse_available_com(result)
174                if item.get("com").upper() in com_list:
175                    return True
176                else:
177                    error_message = "watchGT local com does not exist"
178                    raise LiteParamError(error_message, error_no="00108")
179
180    @staticmethod
181    def _check_wifiiot_config(device):
182        com_type_set = set()
183        for item in device:
184            if "label" not in item.keys():
185                if "com" not in item.keys() or ("com" in item.keys() and
186                                                not item.get("com")):
187                    error_message = "wifiiot local com cannot be " \
188                                    "empty, please check"
189                    raise LiteParamError(error_message, error_no="00108")
190
191                if "type" not in item.keys() or ("type" not in item.keys() and
192                                                 not item.get("type")):
193                    error_message = "wifiiot com type cannot be " \
194                                    "empty, please check"
195                    raise LiteParamError(error_message, error_no="00108")
196                else:
197                    com_type_set.add(item.get("type"))
198        if len(com_type_set) < 2:
199            error_message = "wifiiot need cmd com and deploy com" \
200                            " at the same time, please check"
201            raise LiteParamError(error_message, error_no="00108")
202
203    @staticmethod
204    def _check_ipcamera_local(device):
205        for item in device:
206            if "label" not in item.keys():
207                if "com" not in item.keys() or ("com" in item.keys() and
208                                                not item.get("com")):
209                    error_message = "ipcamera local com cannot be " \
210                                    "empty, please check"
211                    raise LiteParamError(error_message, error_no="00108")
212
213    @staticmethod
214    def _check_ipcamera_remote(device=None):
215        for item in device:
216            if "label" not in item.keys():
217                if "port" in item.keys() and item.get("port") and not item.get(
218                        "port").isnumeric():
219                    error_message = "ipcamera remote port should be " \
220                                    "a number, please check"
221                    raise LiteParamError(error_message, error_no="00108")
222                elif "port" not in item.keys():
223                    error_message = "ipcamera remote port cannot be" \
224                                    " empty, please check"
225                    raise LiteParamError(error_message, error_no="00108")
226
227
228    def __check_config__(self, device=None):
229        self.set_connect_type(device)
230        if self.label == DeviceLabelType.wifiiot:
231            self._check_wifiiot_config(device)
232        elif self.label == DeviceLabelType.ipcamera and \
233                self.device_connect_type == "local":
234            self._check_ipcamera_local(device)
235        elif self.label == DeviceLabelType.ipcamera and \
236                self.device_connect_type == "remote":
237            self._check_ipcamera_remote(device)
238        elif self.label == DeviceLabelType.watch_gt:
239            self._check_watchgt(device)
240
241    def set_connect_type(self, device):
242        pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \
243                  r'01]?\d\d?)$'
244        for item in device:
245            if "label" in item.keys():
246                self.label = item.get("label")
247            if "com" in item.keys():
248                self.device_connect_type = "local"
249            if "ip" in item.keys():
250                if re.match(pattern, item.get("ip")):
251                    self.device_connect_type = "remote"
252                else:
253                    error_message = "Remote device ip not in right" \
254                                         "format, please check user_config.xml"
255                    raise LiteParamError(error_message, error_no="00108")
256        if not self.label:
257            error_message = "device label cannot be empty, " \
258                            "please check"
259            raise LiteParamError(error_message, error_no="00108")
260        else:
261            if self.label != DeviceLabelType.wifiiot and \
262                    self.label != DeviceLabelType.ipcamera and \
263                    self.label != DeviceLabelType.watch_gt:
264                error_message = "device label should be ipcamera or" \
265                                     " wifiiot, please check"
266                raise LiteParamError(error_message, error_no="00108")
267        if not self.device_connect_type:
268            error_message = "device com or ip cannot be empty, " \
269                                 "please check"
270            raise LiteParamError(error_message, error_no="00108")
271
272    def __init_device__(self, device):
273        self.__check_config__(device)
274        self.__set_serial__(device)
275        if self.device_connect_type == "remote":
276            self.device = CONTROLLER_DICT.get("remote")(device)
277        else:
278            self.device = CONTROLLER_DICT.get("local")(device)
279
280        self.ifconfig = device[1].get("ifconfig")
281
282    def connect(self):
283        """
284        Connect the device
285
286        """
287        try:
288            self.device.connect()
289        except LiteDeviceConnectError as _:
290            if check_mode(ModeType.decc):
291                LOG.debug("Set device %s recover state to false" %
292                          self.device_sn)
293                self.device_allocation_state = DeviceAllocationState.unusable
294                self.set_recover_state(False)
295            raise
296
297    @perform_device_action
298    def execute_command_with_timeout(self, command="", case_type="",
299                                     timeout=TIMEOUT, **kwargs):
300        """Executes command on the device.
301
302        Args:
303            command: the command to execute
304            case_type: CTest or CppTest
305            timeout: timeout for read result
306            **kwargs: receiver - parser handler input
307
308        Returns:
309            (filter_result, status, error_message)
310
311            filter_result: command execution result
312            status: true or false
313            error_message: command execution error message
314        """
315        receiver = kwargs.get("receiver", None)
316        if self.device_connect_type == "remote":
317            LOG.info("%s execute command shell %s with timeout %ss" %
318                     (convert_serial(self.__get_serial__()), command,
319                      str(timeout)))
320            filter_result, status, error_message = \
321                self.device.execute_command_with_timeout(
322                    command=command,
323                    timeout=timeout,
324                    receiver=receiver)
325        elif self.device_connect_type == "agent":
326            filter_result, status, error_message = \
327                self.device.execute_command_with_timeout(
328                    command=command,
329                    case_type=case_type,
330                    timeout=timeout,
331                    receiver=receiver, type="cmd")
332        else:
333            filter_result, status, error_message = \
334                self.device.execute_command_with_timeout(
335                    command=command,
336                    case_type=case_type,
337                    timeout=timeout,
338                    receiver=receiver)
339        if not receiver:
340            LOG.debug("%s execute result:%s" % (
341                convert_serial(self.__get_serial__()), filter_result))
342        if not status:
343            LOG.debug(
344                "%s error_message:%s" % (convert_serial(self.__get_serial__()),
345                                         error_message))
346        return filter_result, status, error_message
347
348    def recover_device(self):
349        self.reboot()
350
351    def reboot(self):
352        self.connect()
353        filter_result, status, error_message = self. \
354            execute_command_with_timeout(command="reset", timeout=30)
355        if not filter_result:
356            if check_mode(ModeType.decc):
357                LOG.debug("Set device %s recover state to false" %
358                          self.device_sn)
359                self.device_allocation_state = DeviceAllocationState.unusable
360                self.set_recover_state(False)
361        if self.ifconfig:
362            enter_result, _, _ = self.execute_command_with_timeout(command='\r',
363                                                                   timeout=15)
364            if " #" in enter_result or "OHOS #" in enter_result:
365                LOG.info("Reset device %s success" % self.device_sn)
366                self.execute_command_with_timeout(command=self.ifconfig,
367                                                  timeout=5)
368            elif "hisilicon #" in enter_result:
369                LOG.info("Reset device %s fail" % self.device_sn)
370
371            ifconfig_result, _, _ = self.execute_command_with_timeout(
372                command="ifconfig",
373                timeout=5)
374
375    def close(self):
376        """
377        Close the telnet connection with device server or close the local
378        serial
379        """
380        self.device.close()
381
382    def set_recover_state(self, state):
383        with self.device_lock:
384            setattr(self, ConfigConst.recover_state, state)
385
386    def get_recover_state(self, default_state=True):
387        with self.device_lock:
388            state = getattr(self, ConfigConst.recover_state, default_state)
389            return state
390
391
392class RemoteController:
393    """
394    Class representing an device lite remote device.
395    Each object of this class represents one device lite remote device
396    in xDevice.
397    """
398
399    def __init__(self, device):
400        self.host = device[1].get("ip")
401        self.port = int(device[1].get("port"))
402        self.telnet = None
403
404    def connect(self):
405        """
406        Connect the device server
407
408        """
409        try:
410            if self.telnet:
411                return self.telnet
412            self.telnet = telnetlib.Telnet(self.host, self.port,
413                                           timeout=TIMEOUT)
414        except Exception as err_msgs:
415            error_message = "Connect remote lite device failed, host is %s, " \
416                            "port is %s, error is %s" % \
417                            (convert_ip(self.host), self.port, str(err_msgs))
418            raise LiteDeviceConnectError(error_message, error_no="00401")
419        time.sleep(2)
420        self.telnet.set_debuglevel(0)
421        return self.telnet
422
423    def execute_command_with_timeout(self, command="", timeout=TIMEOUT,
424                                     receiver=None):
425        """
426        Executes command on the device.
427
428        Parameters:
429            command: the command to execute
430            timeout: timeout for read result
431            receiver: parser handler
432        """
433        return LiteHelper.execute_remote_cmd_with_timeout(
434            self.telnet, command, timeout, receiver)
435
436    def close(self):
437        """
438        Close the telnet connection with device server
439        """
440        try:
441            if not self.telnet:
442                return
443            self.telnet.close()
444            self.telnet = None
445        except (ConnectionError, Exception) as _:
446            error_message = "Remote device is disconnected abnormally"
447            LOG.error(error_message, error_no="00401")
448
449
450class LocalController:
451    def __init__(self, device):
452        """
453        Init Local device.
454        Parameters:
455            device: local device
456        """
457        self.com_dict = {}
458        for item in device:
459            if "com" in item.keys():
460                if "type" in item.keys() and ComType.cmd_com == item.get(
461                        "type"):
462                    self.com_dict[ComType.cmd_com] = ComController(item)
463                elif "type" in item.keys() and ComType.deploy_com == item.get(
464                        "type"):
465                    self.com_dict[ComType.deploy_com] = ComController(item)
466
467    def connect(self, key=ComType.cmd_com):
468        """
469        Open serial.
470        """
471        self.com_dict.get(key).connect()
472
473    def close(self, key=ComType.cmd_com):
474        """
475        Close serial.
476        """
477        if self.com_dict and self.com_dict.get(key):
478            self.com_dict.get(key).close()
479
480    def execute_command_with_timeout(self, **kwargs):
481        """
482        Execute command on the serial and read all the output from the serial.
483        """
484        args = kwargs
485        key = args.get("key", ComType.cmd_com)
486        command = args.get("command", None)
487        case_type = args.get("case_type", "")
488        receiver = args.get("receiver", None)
489        timeout = args.get("timeout", TIMEOUT)
490        return self.com_dict.get(key).execute_command_with_timeout(
491            command=command, case_type=case_type,
492            timeout=timeout, receiver=receiver)
493
494
495class ComController:
496    def __init__(self, device):
497        """
498        Init serial.
499        Parameters:
500            device: local com
501        """
502        self.is_open = False
503        self.com = None
504        self.serial_port = device.get("com", None)
505        self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE))
506        self.timeout = int(device.get("timeout", TIMEOUT))
507        self.usb_port = device.get("usb_port", None)
508
509    def connect(self):
510        """
511        Open serial.
512        """
513        try:
514            if not self.is_open:
515                import serial
516                self.com = serial.Serial(self.serial_port,
517                                         baudrate=self.baud_rate,
518                                         timeout=self.timeout)
519                self.is_open = True
520        except Exception as error_msg:
521            error = "connect %s serial failed, please make sure this port is" \
522                    " not occupied, error is %s[00401]" % \
523                   (self.serial_port, str(error_msg))
524            raise LiteDeviceConnectError(error, error_no="00401")
525
526    def close(self):
527        """
528        Close serial.
529        """
530        try:
531            if not self.com:
532                return
533            if self.is_open:
534                self.com.close()
535            self.is_open = False
536        except (ConnectionError, Exception) as _:
537            error_message = "Local device is disconnected abnormally"
538            LOG.error(error_message, error_no="00401")
539
540    def execute_command_with_timeout(self, **kwargs):
541        """
542        Execute command on the serial and read all the output from the serial.
543        """
544        return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs)
545
546    def execute_command(self, command):
547        """
548        Execute command on the serial and read all the output from the serial.
549        """
550        LiteHelper.execute_local_command(self.com, command)
551
552
553CONTROLLER_DICT = {
554    "local": LocalController,
555    "remote": RemoteController,
556}
557