1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import re 21import random 22import string 23import shutil 24import subprocess 25 26from xdevice import platform_logger 27from xdevice import Plugin 28from xdevice import CKit 29from xdevice import ComType 30from xdevice import ParamError 31from xdevice import LiteDeviceError 32from xdevice import ITestKit 33from xdevice import get_config_value 34from xdevice import get_file_absolute_path 35from xdevice import LiteDeviceConnectError 36from xdevice import DeviceAllocationState 37 38 39__all__ = ["DeployKit"] 40LOG = platform_logger("KitLite") 41 42RESET_CMD = "0xEF, 0xBE, 0xAD, 0xDE, 0x0C, 0x00, 0x87, 0x78, 0x00, 0x00, " \ 43 "0x61, 0x94" 44 45 46@Plugin(type=Plugin.TEST_KIT, id=CKit.deploy) 47class DeployKit(ITestKit): 48 def __init__(self): 49 self.burn_file = "" 50 self.burn_command = "" 51 self.timeout = "" 52 self.paths = "" 53 54 def __check_config__(self, config): 55 self.timeout = str(int(get_config_value( 56 'timeout', config, is_list=False, default=0)) * 1000) 57 self.burn_file = get_config_value('burn_file', config, is_list=False) 58 burn_command = get_config_value('burn_command', config, is_list=False, 59 default=RESET_CMD) 60 self.burn_command = burn_command.replace(" ", "").split(",") 61 self.paths = get_config_value('paths', config) 62 if self.timeout == "0" or not self.burn_file: 63 msg = "The config for deploy kit is invalid with timeout:{}, " \ 64 "burn_file:{}".format(self.timeout, self.burn_file) 65 raise ParamError(msg, error_no="00108") 66 67 def _reset(self, device): 68 cmd_com = device.device.com_dict.get(ComType.cmd_com) 69 try: 70 cmd_com.connect() 71 cmd_com.execute_command(command='AT+RST={}'.format(self.timeout)) 72 cmd_com.close() 73 except (LiteDeviceConnectError, IOError) as error: 74 device.device_allocation_state = DeviceAllocationState.unusable 75 LOG.error( 76 "The exception {} happened in deploy kit running".format( 77 error), error_no=getattr(error, "error_no", 78 "00000")) 79 raise LiteDeviceError("%s port set_up wifiiot failed" % 80 cmd_com.serial_port, 81 error_no=getattr(error, "error_no", 82 "00000")) 83 finally: 84 if cmd_com: 85 cmd_com.close() 86 87 def _send_file(self, device, source_file): 88 burn_tool_name = "HiBurn.exe" if os.name == "nt" else "HiBurn" 89 burn_tool_path = get_file_absolute_path( 90 os.path.join("tools", burn_tool_name), self.paths) 91 92 deploy_serial_port = device.device.com_dict.get( 93 ComType.deploy_com).serial_port 94 deploy_baudrate = device.device.com_dict.\ 95 get(ComType.deploy_com).baud_rate 96 port_number = re.findall(r'\d+$', deploy_serial_port) 97 if not port_number: 98 raise LiteDeviceError("The config of serial port {} to deploy is " 99 "invalid".format(deploy_serial_port), 100 error_no="00108") 101 new_temp_tool_path = self.copy_file_as_temp(burn_tool_path, 10) 102 cmd = '{} -com:{} -bin:{} -signalbaud:{}' \ 103 .format(new_temp_tool_path, port_number[0], source_file, 104 deploy_baudrate) 105 LOG.info('The running cmd is {}'.format(cmd)) 106 LOG.info('The burn tool is running, please wait..') 107 return_code, out = subprocess.getstatusoutput(cmd) 108 LOG.info( 109 'Deploy kit to execute burn tool finished with return_code: {} ' 110 'output: {}'.format(return_code, out)) 111 os.remove(new_temp_tool_path) 112 if 0 != return_code: 113 device.device_allocation_state = DeviceAllocationState.unusable 114 raise LiteDeviceError("%s port set_up wifiiot failed" % 115 deploy_serial_port, error_no="00402") 116 117 def __setup__(self, device, **kwargs): 118 """ 119 Execute reset command on the device by cmd serial port and then upload 120 patch file by deploy tool. 121 Parameters: 122 device: the instance of LocalController with one or more 123 ComController 124 """ 125 args = kwargs 126 source_file = args.get("source_file", None) 127 self._reset(device) 128 self._send_file(device, source_file) 129 130 def __teardown__(self, device): 131 pass 132 133 def copy_file_as_temp(self, original_file, str_length): 134 """ 135 To obtain a random string with specified length 136 Parameters: 137 original_file : the original file path 138 str_length: the length of random string 139 """ 140 if os.path.isfile(original_file): 141 random_str = random.sample(string.ascii_letters + string.digits, 142 str_length) 143 new_temp_tool_path = '{}_{}{}'.format( 144 os.path.splitext(original_file)[0], "".join(random_str), 145 os.path.splitext(original_file)[1]) 146 return shutil.copyfile(original_file, new_temp_tool_path)