• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import re
21import random
22import string
23import shutil
24import subprocess
25
26from xdevice import platform_logger
27from xdevice import Plugin
28from xdevice import CKit
29from xdevice import ComType
30from xdevice import ParamError
31from xdevice import LiteDeviceError
32from xdevice import ITestKit
33from xdevice import get_config_value
34from xdevice import get_file_absolute_path
35from xdevice import LiteDeviceConnectError
36from xdevice import DeviceAllocationState
37
38
39__all__ = ["DeployKit"]
40LOG = platform_logger("KitLite")
41
42RESET_CMD = "0xEF, 0xBE, 0xAD, 0xDE, 0x0C, 0x00, 0x87, 0x78, 0x00, 0x00, " \
43            "0x61, 0x94"
44
45
46@Plugin(type=Plugin.TEST_KIT, id=CKit.deploy)
47class DeployKit(ITestKit):
48    def __init__(self):
49        self.burn_file = ""
50        self.burn_command = ""
51        self.timeout = ""
52        self.paths = ""
53
54    def __check_config__(self, config):
55        self.timeout = str(int(get_config_value(
56            'timeout', config, is_list=False, default=0)) * 1000)
57        self.burn_file = get_config_value('burn_file', config, is_list=False)
58        burn_command = get_config_value('burn_command', config, is_list=False,
59                                        default=RESET_CMD)
60        self.burn_command = burn_command.replace(" ", "").split(",")
61        self.paths = get_config_value('paths', config)
62        if self.timeout == "0" or not self.burn_file:
63            msg = "The config for deploy kit is invalid with timeout:{}, " \
64                  "burn_file:{}".format(self.timeout, self.burn_file)
65            raise ParamError(msg, error_no="00108")
66
67    def _reset(self, device):
68        cmd_com = device.device.com_dict.get(ComType.cmd_com)
69        try:
70            cmd_com.connect()
71            cmd_com.execute_command(command='AT+RST={}'.format(self.timeout))
72            cmd_com.close()
73        except (LiteDeviceConnectError, IOError) as error:
74            device.device_allocation_state = DeviceAllocationState.unusable
75            LOG.error(
76                "The exception {} happened in deploy kit running".format(
77                    error), error_no=getattr(error, "error_no",
78                                             "00000"))
79            raise LiteDeviceError("%s port set_up wifiiot failed" %
80                                  cmd_com.serial_port,
81                                  error_no=getattr(error, "error_no",
82                                                   "00000"))
83        finally:
84            if cmd_com:
85                cmd_com.close()
86
87    def _send_file(self, device, source_file):
88        burn_tool_name = "HiBurn.exe" if os.name == "nt" else "HiBurn"
89        burn_tool_path = get_file_absolute_path(
90            os.path.join("tools", burn_tool_name), self.paths)
91
92        deploy_serial_port = device.device.com_dict.get(
93            ComType.deploy_com).serial_port
94        deploy_baudrate = device.device.com_dict.\
95            get(ComType.deploy_com).baud_rate
96        port_number = re.findall(r'\d+$', deploy_serial_port)
97        if not port_number:
98            raise LiteDeviceError("The config of serial port {} to deploy is "
99                                  "invalid".format(deploy_serial_port),
100                                  error_no="00108")
101        new_temp_tool_path = self.copy_file_as_temp(burn_tool_path, 10)
102        cmd = '{} -com:{} -bin:{} -signalbaud:{}' \
103            .format(new_temp_tool_path, port_number[0], source_file,
104                    deploy_baudrate)
105        LOG.info('The running cmd is {}'.format(cmd))
106        LOG.info('The burn tool is running, please wait..')
107        return_code, out = subprocess.getstatusoutput(cmd)
108        LOG.info(
109            'Deploy kit to execute burn tool finished with return_code: {} '
110            'output: {}'.format(return_code, out))
111        os.remove(new_temp_tool_path)
112        if 0 != return_code:
113            device.device_allocation_state = DeviceAllocationState.unusable
114            raise LiteDeviceError("%s port set_up wifiiot failed" %
115                                  deploy_serial_port, error_no="00402")
116
117    def __setup__(self, device, **kwargs):
118        """
119        Execute reset command on the device by cmd serial port and then upload
120        patch file by deploy tool.
121        Parameters:
122            device: the instance of LocalController with one or more
123                    ComController
124        """
125        args = kwargs
126        source_file = args.get("source_file", None)
127        self._reset(device)
128        self._send_file(device, source_file)
129
130    def __teardown__(self, device):
131        pass
132
133    def copy_file_as_temp(self, original_file, str_length):
134        """
135        To obtain a random string with specified length
136        Parameters:
137            original_file : the original file path
138            str_length: the length of random string
139        """
140        if os.path.isfile(original_file):
141            random_str = random.sample(string.ascii_letters + string.digits,
142                                       str_length)
143            new_temp_tool_path = '{}_{}{}'.format(
144                os.path.splitext(original_file)[0], "".join(random_str),
145                os.path.splitext(original_file)[1])
146            return shutil.copyfile(original_file, new_temp_tool_path)