• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import re
20import time
21import os
22import threading
23
24from xdevice import DeviceOsType
25from xdevice import DeviceProperties
26from xdevice import ConfigConst
27from xdevice import DeviceLabelType
28from xdevice import ModeType
29from xdevice import IDevice
30from xdevice import platform_logger
31from xdevice import DeviceAllocationState
32from xdevice import Plugin
33from xdevice import exec_cmd
34from xdevice import convert_serial
35from xdevice import convert_mac
36from xdevice import convert_ip
37from xdevice import check_mode
38from xdevice import Platform
39
40from ohos.constants import ComType
41from ohos.environment.dmlib_lite import LiteHelper
42from ohos.error import ErrorMessage
43from ohos.exception import LiteDeviceConnectError
44from ohos.exception import LiteDeviceTimeout
45from ohos.exception import LiteParamError
46
47LOG = platform_logger("DeviceLite")
48TIMEOUT = 90
49RETRY_ATTEMPTS = 0
50HDC = "litehdc.exe"
51DEFAULT_BAUD_RATE = 115200
52
53
54def get_hdc_path():
55    from xdevice import Variables
56    user_path = os.path.join(Variables.exec_dir, "resource/tools")
57    top_user_path = os.path.join(Variables.top_dir, "config")
58    config_path = os.path.join(Variables.res_dir, "config")
59    paths = [user_path, top_user_path, config_path]
60
61    file_path = ""
62    for path in paths:
63        if os.path.exists(os.path.abspath(os.path.join(
64                path, HDC))):
65            file_path = os.path.abspath(os.path.join(
66                path, HDC))
67            break
68
69    if os.path.exists(file_path):
70        return file_path
71    else:
72        raise LiteParamError(ErrorMessage.Common.Code_0301006)
73
74
75def parse_available_com(com_str):
76    com_str = com_str.replace("\r", " ")
77    com_list = com_str.split("\n")
78    for index, item in enumerate(com_list):
79        com_list[index] = item.strip().strip(b'\x00'.decode())
80    return com_list
81
82
83def perform_device_action(func):
84    def device_action(self, *args, **kwargs):
85        if not self.get_recover_state():
86            LOG.debug("Device %s %s is false" % (self.device_sn,
87                                                 ConfigConst.recover_state))
88            return "", "", ""
89
90        tmp = int(kwargs.get("retry", RETRY_ATTEMPTS))
91        retry = tmp + 1 if tmp > 0 else 1
92        exception = None
93        for num in range(retry):
94            try:
95                result = func(self, *args, **kwargs)
96                return result
97            except LiteDeviceTimeout as error:
98                LOG.error(error)
99                exception = error
100                if num:
101                    self.recover_device()
102            except Exception as error:
103                LOG.error(error)
104                exception = error
105        raise exception
106
107    return device_action
108
109
110@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite)
111class DeviceLite(IDevice):
112    """
113    Class representing a device lite device.
114
115    Each object of this class represents one device lite device in xDevice.
116
117    Attributes:
118        device_connect_type: A string that's the type of lite device
119    """
120    device_os_type = DeviceOsType.lite
121    device_allocation_state = DeviceAllocationState.available
122
123    def __init__(self):
124        self.device_sn = ""
125        self.label = ""
126        self.device_connect_type = ""
127        self.device_kernel = ""
128        self.device = None
129        self.ifconfig = None
130        self.device_id = None
131        self.extend_value = {}
132        self.device_lock = threading.RLock()
133        self.test_platform = Platform.ohos
134        self.device_props = {}
135        self.device_description = {}
136
137    def init_description(self):
138        if self.device_description:
139            return
140        desc = {
141            DeviceProperties.sn: convert_serial(self.device_sn),
142            DeviceProperties.model: self.label,
143            DeviceProperties.type_: self.label,
144            DeviceProperties.platform: self.test_platform,
145            DeviceProperties.version: "",
146            DeviceProperties.others: self.device_props
147        }
148        self.device_description.update(desc)
149
150    def __set_serial__(self, device=None):
151        for item in device:
152            if "ip" in item.keys() and "port" in item.keys():
153                self.device_sn = "remote_%s_%s" % \
154                                 (item.get("ip"), item.get("port"))
155                break
156            elif "type" in item.keys() and "com" in item.keys():
157                self.device_sn = "local_%s" % item.get("com")
158                break
159
160    def __get_serial__(self):
161        return self.device_sn
162
163    def get(self, key=None, default=None):
164        if not key:
165            return default
166        value = getattr(self, key, None)
167        if value:
168            return value
169        else:
170            return self.extend_value.get(key, default)
171
172    def update_device_props(self, props):
173        if self.device_props or not isinstance(props, dict):
174            return
175        self.device_props.update(props)
176
177    def __set_device_kernel__(self, kernel_type=""):
178        self.device_kernel = kernel_type
179
180    def __get_device_kernel__(self):
181        return self.device_kernel
182
183    @staticmethod
184    def _check_watchgt(device):
185        for item in device:
186            if "label" not in item.keys():
187                raise LiteParamError(ErrorMessage.Config.Code_0302027)
188            if "com" not in item.keys() or ("com" in item.keys() and
189                                            not item.get("com")):
190                raise LiteParamError(ErrorMessage.Config.Code_0302028)
191            else:
192                hdc = get_hdc_path()
193                result = exec_cmd([hdc])
194                com_list = parse_available_com(result)
195                if item.get("com").upper() in com_list:
196                    return True
197                else:
198                    raise LiteParamError(ErrorMessage.Config.Code_0302029)
199
200    @staticmethod
201    def _check_wifiiot_config(device):
202        com_type_set = set()
203        for item in device:
204            if "label" not in item.keys():
205                if "com" not in item.keys() or ("com" in item.keys() and
206                                                not item.get("com")):
207                    raise LiteParamError(ErrorMessage.Config.Code_0302030)
208
209                if "type" not in item.keys() or ("type" not in item.keys() and
210                                                 not item.get("type")):
211                    raise LiteParamError(ErrorMessage.Config.Code_0302031)
212                else:
213                    com_type_set.add(item.get("type"))
214        if len(com_type_set) < 2:
215            raise LiteParamError(ErrorMessage.Config.Code_0302032)
216
217    @staticmethod
218    def _check_ipcamera_local(device):
219        for item in device:
220            if "label" not in item.keys():
221                if "com" not in item.keys() or ("com" in item.keys() and
222                                                not item.get("com")):
223                    raise LiteParamError(ErrorMessage.Config.Code_0302033)
224
225    @staticmethod
226    def _check_ipcamera_remote(device=None):
227        for item in device:
228            if "label" not in item.keys():
229                if "port" in item.keys() and item.get("port") and not item.get(
230                        "port").isnumeric():
231                    raise LiteParamError(ErrorMessage.Config.Code_0302034)
232                elif "port" not in item.keys():
233                    raise LiteParamError(ErrorMessage.Config.Code_0302035)
234
235    def __check_config__(self, device=None):
236        self.set_connect_type(device)
237        if self.label == DeviceLabelType.wifiiot:
238            self._check_wifiiot_config(device)
239        elif self.label == DeviceLabelType.ipcamera and \
240                self.device_connect_type == "local":
241            self._check_ipcamera_local(device)
242        elif self.label == DeviceLabelType.ipcamera and \
243                self.device_connect_type == "remote":
244            self._check_ipcamera_remote(device)
245        elif self.label == DeviceLabelType.watch_gt:
246            self._check_watchgt(device)
247
248    def set_connect_type(self, device):
249        pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \
250                  r'01]?\d\d?)$'
251        for item in device:
252            if "label" in item.keys():
253                self.label = item.get("label")
254            if "com" in item.keys():
255                self.device_connect_type = "local"
256            if "ip" in item.keys():
257                if re.match(pattern, item.get("ip")):
258                    self.device_connect_type = "remote"
259                else:
260                    raise LiteParamError(ErrorMessage.Config.Code_0302025)
261        if not self.label:
262            raise LiteParamError(ErrorMessage.Config.Code_0302026)
263        else:
264            if self.label != DeviceLabelType.wifiiot and \
265                    self.label != DeviceLabelType.ipcamera and \
266                    self.label != DeviceLabelType.watch_gt:
267                raise LiteParamError(ErrorMessage.Config.Code_0302020)
268        if not self.device_connect_type:
269            raise LiteParamError(ErrorMessage.Config.Code_0302021)
270
271    def __init_device__(self, device):
272        self.__check_config__(device)
273        self.__set_serial__(device)
274        if self.device_connect_type == "remote":
275            self.device = CONTROLLER_DICT.get("remote")(device)
276        else:
277            self.device = CONTROLLER_DICT.get("local")(device)
278
279        self.ifconfig = device[1].get("ifconfig")
280        if self.ifconfig:
281            self.connect()
282            self.execute_command_with_timeout(command='\r', timeout=5)
283            self.execute_command_with_timeout(command=self.ifconfig, timeout=5)
284
285    def connect(self):
286        """
287        Connect the device
288
289        """
290        try:
291            self.device.connect()
292        except LiteDeviceConnectError as _:
293            if check_mode(ModeType.decc):
294                LOG.debug("Set device %s recover state to false" %
295                          self.device_sn)
296                self.device_allocation_state = DeviceAllocationState.unusable
297                self.set_recover_state(False)
298            raise
299
300    @perform_device_action
301    def execute_command_with_timeout(self, command="", case_type="",
302                                     timeout=TIMEOUT, **kwargs):
303        """Executes command on the device.
304
305        Args:
306            command: the command to execute
307            case_type: CTest or CppTest
308            timeout: timeout for read result
309            **kwargs: receiver - parser handler input
310
311        Returns:
312            (filter_result, status, error_message)
313
314            filter_result: command execution result
315            status: true or false
316            error_message: command execution error message
317        """
318        receiver = kwargs.get("receiver", None)
319        if self.device_connect_type == "remote":
320            LOG.info("%s execute command shell %s with timeout %ss" %
321                     (convert_serial(self.__get_serial__()), command,
322                      str(timeout)))
323            filter_result, status, error_message = \
324                self.device.execute_command_with_timeout(
325                    command=command,
326                    timeout=timeout,
327                    receiver=receiver)
328        elif self.device_connect_type == "agent":
329            filter_result, status, error_message = \
330                self.device.execute_command_with_timeout(
331                    command=command,
332                    case_type=case_type,
333                    timeout=timeout,
334                    receiver=receiver, type="cmd")
335        else:
336            filter_result, status, error_message = \
337                self.device.execute_command_with_timeout(
338                    command=command,
339                    case_type=case_type,
340                    timeout=timeout,
341                    receiver=receiver)
342        if not receiver:
343            LOG.debug("{} execute result:{}".format(convert_serial(self.__get_serial__()),
344                                                    convert_mac(filter_result.replace("BS", ""))))
345        if not status:
346            LOG.debug("{} error_message:{}".format(convert_serial(self.__get_serial__()),
347                                                   convert_mac(error_message.replace("BS", ""))))
348
349        return filter_result.replace("BS", ""), status, error_message
350
351    def recover_device(self):
352        self.reboot()
353
354    def reboot(self):
355        self.connect()
356        filter_result, status, error_message = self. \
357            execute_command_with_timeout(command="reset", timeout=30)
358        if not filter_result:
359            if check_mode(ModeType.decc):
360                LOG.debug("Set device %s recover state to false" %
361                          self.device_sn)
362                self.device_allocation_state = DeviceAllocationState.unusable
363                self.set_recover_state(False)
364        if self.ifconfig:
365            enter_result, _, _ = self.execute_command_with_timeout(
366                command='\r',
367                timeout=15)
368            if " #" in enter_result or "OHOS #" in enter_result:
369                LOG.info("Reset device %s success" % self.device_sn)
370                self.execute_command_with_timeout(command=self.ifconfig,
371                                                  timeout=5)
372            elif "hisilicon #" in enter_result:
373                LOG.info("Reset device %s fail" % self.device_sn)
374
375            ifconfig_result, _, _ = self.execute_command_with_timeout(
376                command="ifconfig",
377                timeout=5)
378
379    def close(self):
380        """
381        Close the telnet connection with device server or close the local
382        serial
383        """
384        self.device.close()
385
386    def set_recover_state(self, state):
387        with self.device_lock:
388            setattr(self, ConfigConst.recover_state, state)
389
390    def get_recover_state(self, default_state=True):
391        with self.device_lock:
392            state = getattr(self, ConfigConst.recover_state, default_state)
393            return state
394
395
396class RemoteController:
397    """
398    Class representing a device lite remote device.
399    Each object of this class represents one device lite remote device
400    in xDevice.
401    """
402
403    def __init__(self, device):
404        self.host = device[1].get("ip")
405        self.port = int(device[1].get("port"))
406        self.telnet = None
407
408    def connect(self):
409        """
410        Connect the device server
411        """
412        try:
413            if self.telnet:
414                return self.telnet
415            import telnetlib
416            self.telnet = telnetlib.Telnet(self.host, self.port,
417                                           timeout=TIMEOUT)
418        except Exception as error:
419            raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303008.format(
420                convert_ip(self.host), self.port, str(error))) from error
421        time.sleep(2)
422        self.telnet.set_debuglevel(0)
423        return self.telnet
424
425    def execute_command_with_timeout(self, command="", timeout=TIMEOUT,
426                                     receiver=None):
427        """
428        Executes command on the device.
429
430        Parameters:
431            command: the command to execute
432            timeout: timeout for read result
433            receiver: parser handler
434        """
435        return LiteHelper.execute_remote_cmd_with_timeout(
436            self.telnet, command, timeout, receiver)
437
438    def close(self):
439        """
440        Close the telnet connection with device server
441        """
442        try:
443            if not self.telnet:
444                return
445            self.telnet.close()
446            self.telnet = None
447        except ConnectionError as _:
448            error_message = "Remote device is disconnected abnormally"
449            LOG.error(error_message, error_no="00401")
450
451
452class LocalController:
453    def __init__(self, device):
454        """
455        Init Local device.
456        Parameters:
457            device: local device
458        """
459        self.com_dict = {}
460        for item in device:
461            if "com" in item.keys():
462                if "type" in item.keys() and ComType.cmd_com == item.get(
463                        "type"):
464                    self.com_dict[ComType.cmd_com] = ComController(item)
465                elif "type" in item.keys() and ComType.deploy_com == item.get(
466                        "type"):
467                    self.com_dict[ComType.deploy_com] = ComController(item)
468
469    def connect(self, key=ComType.cmd_com):
470        """
471        Open serial.
472        """
473        self.com_dict.get(key).connect()
474
475    def close(self, key=ComType.cmd_com):
476        """
477        Close serial.
478        """
479        if self.com_dict and self.com_dict.get(key):
480            self.com_dict.get(key).close()
481
482    def execute_command_with_timeout(self, **kwargs):
483        """
484        Execute command on the serial and read all the output from the serial.
485        """
486        args = kwargs
487        key = args.get("key", ComType.cmd_com)
488        command = args.get("command", None)
489        case_type = args.get("case_type", "")
490        receiver = args.get("receiver", None)
491        timeout = args.get("timeout", TIMEOUT)
492        return self.com_dict.get(key).execute_command_with_timeout(
493            command=command, case_type=case_type,
494            timeout=timeout, receiver=receiver)
495
496
497class ComController:
498    def __init__(self, device):
499        """
500        Init serial.
501        Parameters:
502            device: local com
503        """
504        self.is_open = False
505        self.com = None
506        self.serial_port = device.get("com", None)
507        self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE))
508        self.timeout = int(device.get("timeout", TIMEOUT))
509        self.usb_port = device.get("usb_port", None)
510
511    def connect(self):
512        """
513        Open serial.
514        """
515        try:
516            if not self.is_open:
517                import serial
518                self.com = serial.Serial(self.serial_port,
519                                         baudrate=self.baud_rate,
520                                         timeout=self.timeout)
521                self.is_open = True
522                return self.com
523        except Exception as error:
524            raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303009.format(
525                self.serial_port, str(error))) from error
526        return None
527
528    def close(self):
529        """
530        Close serial.
531        """
532        try:
533            if not self.com:
534                return
535            if self.is_open:
536                self.com.close()
537            self.is_open = False
538        except ConnectionError as _:
539            error_message = "Local device is disconnected abnormally"
540            LOG.error(error_message, error_no="00401")
541
542    def execute_command_with_timeout(self, **kwargs):
543        """
544        Execute command on the serial and read all the output from the serial.
545        """
546        return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs)
547
548    def execute_command(self, command):
549        """
550        Execute command on the serial and read all the output from the serial.
551        """
552        LiteHelper.execute_local_command(self.com, command)
553
554
555CONTROLLER_DICT = {
556    "local": LocalController,
557    "remote": RemoteController,
558}
559