1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import random 21import re 22import string 23import subprocess 24import shutil 25import platform 26import glob 27import time 28import sys 29from xdevice import Plugin 30from xdevice import platform_logger 31from xdevice import DeviceAllocationState 32from xdevice import ParamError 33from xdevice import ITestKit 34from xdevice import get_config_value 35from xdevice import get_file_absolute_path 36from xdevice import UserConfigManager 37from xdevice import ConfigConst 38from xdevice import get_local_ip 39from xdevice import FilePermission 40from xdevice import DeviceTestType 41from xdevice import DeviceLabelType 42from ohos.exception import LiteDeviceConnectError 43from ohos.exception import LiteDeviceError 44from ohos.exception import LiteDeviceMountError 45from ohos.constants import ComType 46from ohos.constants import CKit 47from ohos.constants import DeviceLiteKernel 48from ohos.utils import parse_strings_key_value 49 50 51__all__ = ["DeployKit", "MountKit", "RootFsKit", "QueryKit", "LiteShellKit", 52 "LiteAppInstallKit", "DeployToolKit"] 53LOG = platform_logger("KitLite") 54RESET_CMD = "0xEF, 0xBE, 0xAD, 0xDE, 0x0C, 0x00, 0x87, 0x78, 0x00, 0x00, 0x61, 0x94" 55 56 57def execute_query(device, query): 58 if not query: 59 LOG.debug("query bin is none") 60 return 61 if device.device_props: 62 LOG.debug("query bin has been executed") 63 return 64 LOG.debug("execute query bin begins") 65 if device.__get_device_kernel__() == DeviceLiteKernel.linux_kernel: 66 command = f"chmod +x /storage{query} && /storage{query}" 67 else: 68 command = f".{query}" 69 output, _, _ = device.execute_command_with_timeout(command=command, timeout=10) 70 LOG.debug(output) 71 params = parse_strings_key_value(output) 72 device.update_device_props(params) 73 LOG.debug("execute query bin ends") 74 75 76@Plugin(type=Plugin.TEST_KIT, id=CKit.deploy) 77class DeployKit(ITestKit): 78 def __init__(self): 79 self.burn_file = "" 80 self.burn_command = "" 81 self.timeout = "" 82 self.paths = "" 83 84 def __check_config__(self, config): 85 self.timeout = str(int(get_config_value( 86 'timeout', config, is_list=False, default=0)) // 1000) 87 self.burn_file = get_config_value('burn_file', config, is_list=False) 88 burn_command = get_config_value('burn_command', config, is_list=False, 89 default=RESET_CMD) 90 self.burn_command = burn_command.replace(" ", "").split(",") 91 self.paths = get_config_value('paths', config) 92 if self.timeout == "0" or not self.burn_file: 93 msg = "The config for deploy kit is invalid with timeout:{}, " \ 94 "burn_file:{}".format(self.timeout, self.burn_file) 95 raise ParamError(msg, error_no="00108") 96 97 def _reset(self, device): 98 cmd_com = device.device.com_dict.get(ComType.cmd_com) 99 try: 100 cmd_com.connect() 101 cmd_com.execute_command( 102 command='AT+RST={}'.format(self.timeout)) 103 cmd_com.close() 104 except (LiteDeviceConnectError, IOError) as error: 105 device.device_allocation_state = DeviceAllocationState.unusable 106 LOG.error( 107 "The exception {} happened in deploy kit running".format( 108 error), error_no=getattr(error, "error_no", 109 "00000")) 110 raise LiteDeviceError("%s port set_up wifiiot failed" % 111 cmd_com.serial_port, 112 error_no=getattr(error, "error_no", 113 "00000")) 114 finally: 115 if cmd_com: 116 cmd_com.close() 117 118 def _send_file(self, device): 119 burn_tool_name = "HiBurn.exe" if os.name == "nt" else "HiBurn" 120 burn_tool_path = get_file_absolute_path( 121 os.path.join("tools", burn_tool_name), self.paths) 122 patch_file = get_file_absolute_path(self.burn_file, self.paths) 123 deploy_serial_port = device.device.com_dict.get( 124 ComType.deploy_com).serial_port 125 deploy_baudrate = device.device.com_dict.\ 126 get(ComType.deploy_com).baud_rate 127 port_number = re.findall(r'\d+$', deploy_serial_port) 128 if not port_number: 129 raise LiteDeviceError("The config of serial port {} to deploy is " 130 "invalid".format(deploy_serial_port), 131 error_no="00108") 132 new_temp_tool_path = copy_file_as_temp(burn_tool_path, 10) 133 cmd = '{} -com:{} -bin:{} -signalbaud:{}' \ 134 .format(new_temp_tool_path, port_number[0], patch_file, 135 deploy_baudrate) 136 LOG.info('The running cmd is {}'.format(cmd)) 137 LOG.info('The burn tool is running, please wait..') 138 return_code, out = subprocess.getstatusoutput(cmd) 139 LOG.info( 140 'Deploy kit to execute burn tool finished with return code: {} ' 141 'output: {}'.format(return_code, out)) 142 os.remove(new_temp_tool_path) 143 if 0 != return_code: 144 device.device_allocation_state = DeviceAllocationState.unusable 145 raise LiteDeviceError("%s port set_up wifiiot failed" % 146 deploy_serial_port, error_no="00402") 147 148 def __setup__(self, device, **kwargs): 149 """ 150 Execute reset command on the device by cmd serial port and then upload 151 patch file by deploy tool. 152 Parameters: 153 device: the instance of LocalController with one or more 154 ComController 155 """ 156 del kwargs 157 self._reset(device) 158 self._send_file(device) 159 160 def __teardown__(self, device): 161 pass 162 163 164@Plugin(type=Plugin.TEST_KIT, id=CKit.mount) 165class MountKit(ITestKit): 166 def __init__(self): 167 self.remote = None 168 self.paths = "" 169 self.mount_list = [] 170 self.mounted_dir = set() 171 self.server = "" 172 self.file_name_list = [] 173 self.remote_info = None 174 175 def __check_config__(self, config): 176 self.remote = get_config_value('server', config, is_list=False) 177 self.paths = get_config_value('paths', config) 178 self.mount_list = get_config_value('mount', config, is_list=True) 179 self.server = get_config_value('server', config, is_list=False, 180 default="NfsServer") 181 if not self.mount_list: 182 msg = "The config for mount kit is invalid with mount:{}" \ 183 .format(self.mount_list) 184 LOG.error(msg, error_no="00108") 185 raise TypeError("Load Error[00108]") 186 187 def mount_on_board(self, device=None, remote_info=None, case_type=""): 188 """ 189 Init the environment on the device server, e.g. mount the testcases to 190 server 191 192 Parameters: 193 device: DeviceLite, device lite on local or remote 194 remote_info: dict, includes 195 linux_host: str, nfs_server ip 196 linux_directory: str, the directory on the linux 197 is_remote: str, server is remote or not 198 case_type: str, CppTestLite or CTestLite, default value is 199 DeviceTestType.cpp_test_lite 200 201 Returns: 202 True or False, represent init Failed or success 203 """ 204 if not remote_info: 205 raise ParamError("failed to get server environment", 206 error_no="00108") 207 208 linux_host = remote_info.get("ip", "") 209 linux_directory = remote_info.get("dir", "") 210 is_remote = remote_info.get("remote", "false") 211 liteos_commands = ["cd /", "umount device_directory", 212 "mount nfs_ip:nfs_directory device" 213 "_directory nfs"] 214 linux_commands = ["cd /{}".format("storage"), 215 "umount -f /{}/{}".format("storage", "device_directory"), 216 "toybox mount -t nfs -o nolock,addr=nfs_ip nfs_ip:nfs_directory " 217 "/{}/{}".format("storage", "device_directory"), 218 "chmod 755 -R /{}/{}".format( 219 "storage", "device_directory")] 220 if not linux_host or not linux_directory: 221 raise LiteDeviceMountError( 222 "nfs server miss ip or directory[00108]", error_no="00108") 223 224 commands = [] 225 if device.label == "ipcamera": 226 env_result, status, _ = device.execute_command_with_timeout( 227 command="uname", timeout=1, retry=2) 228 if status: 229 if env_result.find(DeviceLiteKernel.linux_kernel) != -1 or \ 230 env_result.find("Linux") != -1: 231 commands = linux_commands 232 device.__set_device_kernel__(DeviceLiteKernel.linux_kernel) 233 else: 234 commands = liteos_commands 235 device.__set_device_kernel__(DeviceLiteKernel.lite_kernel) 236 else: 237 raise LiteDeviceMountError("failed to get device env[00402]", 238 error_no="00402") 239 240 for mount_file in self.mount_list: 241 target = mount_file.get("target", "/test_root") 242 if target in self.mounted_dir: 243 LOG.debug("%s is mounted" % target) 244 continue 245 if target == "/test_root/tools" and device.device_props: 246 LOG.debug("query bin has been executed, '/test_root/tools' no need to mount again") 247 continue 248 mkdir_on_board(device, target) 249 250 temp_linux_directory = linux_directory 251 if is_remote.lower() == "false": 252 temp_linux_directory = get_mount_dir(linux_directory) 253 for command in commands: 254 command = command.replace("nfs_ip", linux_host). \ 255 replace("nfs_directory", temp_linux_directory).replace( 256 "device_directory", target).replace("//", "/") 257 timeout = 15 if command.startswith("mount") else 1 258 if command.startswith("mount"): 259 self.mounted_dir.add(target) 260 for mount_time in range(1, 4): 261 result, status, _ = device.\ 262 execute_command_with_timeout(command=command, 263 case_type=case_type, 264 timeout=timeout) 265 if status: 266 break 267 if "already mounted" in result: 268 LOG.info("{} is mounted".format(target)) 269 break 270 LOG.info("Mount failed,try " 271 "again {} time".format(mount_time)) 272 if mount_time == 3: 273 raise LiteDeviceMountError("Failed to mount the " 274 "device[00402]", 275 error_no="00402") 276 else: 277 result, status, _ = device.execute_command_with_timeout( 278 command=command, case_type=case_type, timeout=timeout) 279 LOG.info('Prepare environment success') 280 execute_query(device, '/test_root/tools/querySmall.bin') 281 282 def __setup__(self, device, **kwargs): 283 """ 284 Mount the file to the board by the nfs server. 285 """ 286 LOG.debug("Start mount kit setup") 287 288 request = kwargs.get("request", None) 289 if not request: 290 raise ParamError("MountKit setup request is None", 291 error_no="02401") 292 device.connect() 293 294 config_manager = UserConfigManager( 295 config_file=request.get(ConfigConst.configfile, ""), 296 env=request.get(ConfigConst.test_environment, "")) 297 remote_info = config_manager.get_user_config("testcases/server", 298 filter_name=self.server) 299 300 copy_list = self.copy_to_server(remote_info.get("dir"), 301 remote_info.get("ip"), 302 request, request.config.testcases_path) 303 304 self.mount_on_board(device=device, remote_info=remote_info, 305 case_type=DeviceTestType.cpp_test_lite) 306 307 return copy_list 308 309 def copy_to_server(self, linux_directory, linux_host, request, 310 testcases_dir): 311 file_local_paths = [] 312 # find querySmall.bin 313 query_small_src = "resource/tools/querySmall.bin" 314 try: 315 file_path = get_file_absolute_path(query_small_src, self.paths) 316 file_local_paths.append(file_path) 317 self.mount_list.append({"source": query_small_src, "target": "/test_root/tools"}) 318 except ParamError: 319 LOG.debug("query bin is not found") 320 321 for mount_file in self.mount_list: 322 source = mount_file.get("source") 323 if not source: 324 raise TypeError("The source of MountKit cant be empty " 325 "in Test.json!") 326 source = source.replace("$testcases/", "").replace("$resources/", "") 327 file_path = get_file_absolute_path(source, self.paths) 328 if os.path.isdir(file_path): 329 for root, _, files in os.walk(file_path): 330 for _file in files: 331 if _file.endswith(".json"): 332 continue 333 file_local_paths.append(os.path.join(root, _file)) 334 else: 335 file_local_paths.append(file_path) 336 337 config_manager = UserConfigManager( 338 config_file=request.get(ConfigConst.configfile, ""), 339 env=request.get(ConfigConst.test_environment, "")) 340 remote_info = config_manager.get_user_config("testcases/server", 341 filter_name=self.server) 342 self.remote_info = remote_info 343 344 if not remote_info: 345 err_msg = "The name of remote device {} does not match". \ 346 format(self.remote) 347 LOG.error(err_msg, error_no="00403") 348 raise TypeError(err_msg) 349 is_remote = remote_info.get("remote", "false") 350 if (str(get_local_ip()) == linux_host) and ( 351 linux_directory == ("/data%s" % testcases_dir)): 352 return 353 ip = remote_info.get("ip", "") 354 port = remote_info.get("port", "") 355 remote_dir = remote_info.get("dir", "") 356 if not ip or not port or not remote_dir: 357 LOG.warning("Nfs server's ip or port or dir is empty") 358 return 359 for _file in file_local_paths: 360 # remote copy 361 LOG.info("Trying to copy the file from {} to nfs server". 362 format(_file)) 363 if not is_remote.lower() == "false": 364 try: 365 import paramiko 366 client = paramiko.Transport(ip, int(port)) 367 client.connect(username=remote_info.get("username"), 368 password=remote_info.get("password")) 369 sftp = paramiko.SFTPClient.from_transport(client) 370 sftp.put(localpath=_file, remotepath=os.path.join( 371 remote_info.get("dir"), os.path.basename(_file))) 372 client.close() 373 except (OSError, Exception) as exception: 374 msg = "copy file to nfs server failed with error {}" \ 375 .format(exception) 376 LOG.error(msg, error_no="00403") 377 # local copy 378 else: 379 for count in range(1, 4): 380 try: 381 os.remove(os.path.join(remote_info.get("dir"), 382 os.path.basename(_file))) 383 except OSError as _: 384 pass 385 shutil.copy(_file, remote_info.get("dir")) 386 if check_server_file(_file, remote_info.get("dir")): 387 break 388 else: 389 LOG.info( 390 "Trying to copy the file from {} to nfs " 391 "server {} times".format(_file, count)) 392 if count == 3: 393 msg = "Copy {} to nfs server failed {} times".format( 394 os.path.basename(_file), count) 395 LOG.error(msg, error_no="00403") 396 LOG.debug("Nfs server:{}".format(glob.glob( 397 os.path.join(remote_info.get("dir"), '*.*')))) 398 399 self.file_name_list.append(os.path.basename(_file)) 400 401 return self.file_name_list 402 403 def __teardown__(self, device): 404 if device.__get_device_kernel__() == DeviceLiteKernel.linux_kernel: 405 device.execute_command_with_timeout(command="cd /storage", 406 timeout=1) 407 for mounted_dir in self.mounted_dir: 408 device.execute_command_with_timeout(command="umount -f " 409 "/storage{}". 410 format(mounted_dir), 411 timeout=2) 412 device.execute_command_with_timeout(command="rm -r /storage{}". 413 format(mounted_dir), 414 timeout=1) 415 else: 416 device.execute_command_with_timeout(command="cd /", timeout=1) 417 for mounted_dir in self.mounted_dir: 418 for mount_time in range(1, 3): 419 result, status, _ = device.execute_command_with_timeout( 420 command="umount {}".format(mounted_dir), 421 timeout=2) 422 if result.find("Resource busy") == -1: 423 device.execute_command_with_timeout(command="rm -r {}". 424 format(mounted_dir) 425 , timeout=1) 426 if status: 427 break 428 LOG.info("Umount failed,try " 429 "again {} time".format(mount_time)) 430 time.sleep(1) 431 432 433def copy_file_as_temp(original_file, str_length): 434 """ 435 To obtain a random string with specified length 436 Parameters: 437 original_file : the original file path 438 str_length: the length of random string 439 """ 440 if os.path.isfile(original_file): 441 random_str = random.sample(string.ascii_letters + string.digits, 442 str_length) 443 new_temp_tool_path = '{}_{}{}'.format( 444 os.path.splitext(original_file)[0], "".join(random_str), 445 os.path.splitext(original_file)[1]) 446 return shutil.copyfile(original_file, new_temp_tool_path) 447 448 449def mkdir_on_board(device, dir_path): 450 """ 451 Liteos L1 board don't support mkdir -p 452 Parameters: 453 device : the L1 board 454 dir_path: the dir path to make 455 """ 456 if device.__get_device_kernel__() == DeviceLiteKernel.linux_kernel: 457 device.execute_command_with_timeout(command="cd /storage", timeout=1) 458 else: 459 device.execute_command_with_timeout(command="cd /", timeout=1) 460 for sub_dir in dir_path.split("/"): 461 if sub_dir in ["", "/"]: 462 continue 463 device.execute_command_with_timeout(command="mkdir {}".format(sub_dir), 464 timeout=1) 465 device.execute_command_with_timeout(command="cd {}".format(sub_dir), 466 timeout=1) 467 if device.__get_device_kernel__() == DeviceLiteKernel.linux_kernel: 468 device.execute_command_with_timeout(command="cd /storage", timeout=1) 469 else: 470 device.execute_command_with_timeout(command="cd /", timeout=1) 471 472 473def get_mount_dir(mount_dir): 474 """ 475 Use windows path to mount directly when the system is windows 476 Parameters: 477 mount_dir : the dir to mount that config in user_config.xml 478 such as: the mount_dir is: D:\\mount\\root 479 the mount command should be: mount ip:/d/mount/root 480 """ 481 if platform.system() == "Windows": 482 mount_dir = mount_dir.replace(":", "").replace("\\", "/") 483 _list = mount_dir.split("/") 484 if mount_dir.startswith("/"): 485 _list[1] = _list[1].lower() 486 else: 487 _list[0] = _list[0].lower() 488 mount_dir = "/".join(_list) 489 mount_dir = "/%s" % mount_dir 490 return mount_dir 491 492 493def check_server_file(local_file, target_path): 494 for file_list in glob.glob(os.path.join(target_path, '*.*')): 495 if os.path.basename(local_file) in file_list: 496 return True 497 return False 498 499 500@Plugin(type=Plugin.TEST_KIT, id=CKit.rootfs) 501class RootFsKit(ITestKit): 502 def __init__(self): 503 self.checksum_command = None 504 self.hash_file_name = None 505 self.device_label = None 506 507 def __check_config__(self, config): 508 self.checksum_command = get_config_value("command", config, 509 is_list=False) 510 self.hash_file_name = get_config_value("hash_file_name", config, 511 is_list=False) 512 self.device_label = get_config_value("device_label", config, 513 is_list=False) 514 if not self.checksum_command or not self.hash_file_name or \ 515 not self.device_label: 516 msg = "The config for rootfs kit is invalid : checksum :{}" \ 517 " hash file name:{} device label:{}" \ 518 .format(self.checksum_command, self.hash_file_name, 519 self.device_label) 520 LOG.error(msg, error_no="00108") 521 return TypeError(msg) 522 523 def __setup__(self, device, **kwargs): 524 del kwargs 525 526 # check device label 527 if not device.label == self.device_label: 528 LOG.error("Device label is not match '%s '" % "demo label", 529 error_no="00108") 530 return False 531 else: 532 report_path = self._get_report_dir() 533 if report_path and os.path.exists(report_path): 534 535 # execute command of checksum 536 device.connect() 537 device.execute_command_with_timeout( 538 command="cd /", case_type=DeviceTestType.cpp_test_lite) 539 result, _, _ = device.execute_command_with_timeout( 540 command=self.checksum_command, 541 case_type=DeviceTestType.cpp_test_lite) 542 device.close() 543 # get serial from device and then join new file name 544 pos = self.hash_file_name.rfind(".") 545 serial = "_%s" % device.__get_serial__() 546 if pos > 0: 547 hash_file_name = "".join((self.hash_file_name[:pos], 548 serial, 549 self.hash_file_name[pos:])) 550 else: 551 hash_file_name = "".join((self.hash_file_name, serial)) 552 hash_file_path = os.path.join(report_path, hash_file_name) 553 # write result to file 554 hash_file_path_open = os.open(hash_file_path, os.O_WRONLY | 555 os.O_CREAT | os.O_APPEND, 556 FilePermission.mode_755) 557 558 with os.fdopen(hash_file_path_open, mode="w") as hash_file: 559 hash_file.write(result) 560 hash_file.flush() 561 else: 562 msg = "RootFsKit teardown, log path [%s] not exists!" \ 563 % report_path 564 LOG.error(msg, error_no="00440") 565 return False 566 return True 567 568 def __teardown__(self, device): 569 pass 570 571 @staticmethod 572 def _get_report_dir(): 573 from xdevice import Variables 574 report_path = os.path.join(Variables.exec_dir, 575 Variables.report_vars.report_dir, 576 Variables.task_name) 577 return report_path 578 579 580@Plugin(type=Plugin.TEST_KIT, id=CKit.query) 581class QueryKit(ITestKit): 582 def __init__(self): 583 self.mount_kit = MountKit() 584 self.query = "" 585 self.properties = "" 586 587 def __check_config__(self, config): 588 setattr(self.mount_kit, "mount_list", 589 get_config_value('mount', config)) 590 setattr(self.mount_kit, "server", get_config_value( 591 'server', config, is_list=False, default="NfsServer")) 592 self.query = get_config_value('query', config, is_list=False) 593 self.properties = get_config_value('properties', config, is_list=False) 594 595 if not self.query: 596 msg = "The config for query kit is invalid with query:{}" \ 597 .format(self.query) 598 LOG.error(msg, error_no="00108") 599 raise TypeError(msg) 600 601 def __setup__(self, device, **kwargs): 602 LOG.debug("Start query kit setup") 603 if device.label != DeviceLabelType.ipcamera: 604 return 605 request = kwargs.get("request", None) 606 if not request: 607 raise ParamError("the request of queryKit is None", 608 error_no="02401") 609 self.mount_kit.__setup__(device, request=request) 610 execute_query(device, self.query) 611 612 def __teardown__(self, device): 613 if device.label != DeviceLabelType.ipcamera: 614 return 615 device.connect() 616 self.mount_kit.__teardown__(device) 617 device.close() 618 619 620@Plugin(type=Plugin.TEST_KIT, id=CKit.liteshell) 621class LiteShellKit(ITestKit): 622 def __init__(self): 623 self.command_list = [] 624 self.tear_down_command = [] 625 self.paths = None 626 627 def __check_config__(self, config): 628 self.command_list = get_config_value('run-command', config) 629 self.tear_down_command = get_config_value('teardown-command', config) 630 631 def __setup__(self, device, **kwargs): 632 del kwargs 633 LOG.debug("LiteShellKit setup, device:{}".format(device.device_sn)) 634 if len(self.command_list) == 0: 635 LOG.info("No setup command to run, skipping!") 636 return 637 for command in self.command_list: 638 run_command(device, command) 639 640 def __teardown__(self, device): 641 LOG.debug("LiteShellKit teardown: device:{}".format(device.device_sn)) 642 if len(self.tear_down_command) == 0: 643 LOG.info("No teardown command to run, skipping!") 644 return 645 for command in self.tear_down_command: 646 run_command(device, command) 647 648 649def run_command(device, command): 650 LOG.debug("The command:{} is running".format(command)) 651 if command.strip() == "reset": 652 device.reboot() 653 else: 654 device.execute_shell_command(command) 655 656 657@Plugin(type=Plugin.TEST_KIT, id=CKit.liteinstall) 658class LiteAppInstallKit(ITestKit): 659 def __init__(self): 660 self.app_list = "" 661 self.is_clean = "" 662 self.alt_dir = "" 663 self.bundle_name = None 664 self.paths = "" 665 self.signature = False 666 667 def __check_config__(self, options): 668 self.app_list = get_config_value('test-file-name', options) 669 self.is_clean = get_config_value('cleanup-apps', options, False) 670 self.signature = get_config_value('signature', options, False) 671 self.alt_dir = get_config_value('alt-dir', options, False) 672 if self.alt_dir and self.alt_dir.startswith("resource/"): 673 self.alt_dir = self.alt_dir[len("resource/"):] 674 self.paths = get_config_value('paths', options) 675 676 def __setup__(self, device, **kwargs): 677 del kwargs 678 LOG.debug("LiteAppInstallKit setup, device:{}". 679 format(device.device_sn)) 680 if len(self.app_list) == 0: 681 LOG.info("No app to install, skipping!") 682 return 683 684 for app in self.app_list: 685 if app.endswith(".hap"): 686 device.execute_command_with_timeout("cd /", timeout=1) 687 if self.signature: 688 device.execute_command_with_timeout( 689 command="./bin/bm set -d enable", timeout=10) 690 else: 691 device.execute_command_with_timeout( 692 command="./bin/bm set -s disable", timeout=10) 693 694 device.execute_command_with_timeout( 695 "./bin/bm install -p %s" % app, timeout=60) 696 697 def __teardown__(self, device): 698 LOG.debug("LiteAppInstallKit teardown: device:{}".format( 699 device.device_sn)) 700 if self.is_clean and str(self.is_clean).lower() == "true" \ 701 and self.bundle_name: 702 device.execute_command_with_timeout( 703 "./bin/bm uninstall -n %s" % self.bundle_name, timeout=90) 704 705 706@Plugin(type=Plugin.TEST_KIT, id=CKit.deploytool) 707class DeployToolKit(ITestKit): 708 def __init__(self): 709 self.config = None 710 self.auto_deploy = None 711 self.device_label = None 712 self.time_out = None 713 self.paths = None 714 self.upgrade_file_path = None 715 self.burn_tools = None 716 717 def __check_config__(self, config): 718 self.config = config 719 self.paths = get_config_value("paths", config) 720 self.burn_file = get_config_value("burn_file", config, is_list=False) 721 self.auto_deploy = get_config_value('auto_deploy', 722 config, is_list=False) 723 self.device_label = get_config_value("device_label", config, 724 is_list=False) 725 self.time_out = get_config_value("timeout", config, 726 is_list=False) 727 self.upgrade_file_path = get_config_value("upgrade_file_path", config, is_list=False) 728 self.burn_tools = get_config_value("burn_tools", config, is_list=False) 729 730 if not self.auto_deploy or not self.upgrade_file_path or not self.time_out: 731 msg = "The config for deploy tool kit is" \ 732 "invalid: upgrade_file_path :{} time out:{}".format( 733 self.upgrade_file_path, self.time_out) 734 LOG.error(msg, error_no="00108") 735 return TypeError(msg) 736 737 def __setup__(self, device, **kwargs): 738 LOG.info("Upgrade file path:{}".format(self.upgrade_file_path)) 739 upgrade_file_name = os.path.basename(self.upgrade_file_path) 740 if self.upgrade_file_path.startswith("resource"): 741 self.upgrade_file_path = get_file_absolute_path( 742 os.path.join("tools", upgrade_file_name), self.paths) 743 sys.path.insert(0, os.path.dirname(self.upgrade_file_path)) 744 serial_port = device.device.com_dict.get(ComType.deploy_com).serial_port 745 LOG.debug("Serial port:{}".format(serial_port)) 746 baud_rate = device.device.com_dict.get(ComType.deploy_com).baud_rate 747 usb_port = device.device.com_dict.get(ComType.cmd_com).usb_port 748 patch_file = get_file_absolute_path(self.burn_file, self.paths) 749 upgrade_name = upgrade_file_name.split(".py")[0] 750 import_cmd_str = "from {} import {} as upgrade_device".format( 751 upgrade_name, upgrade_name) 752 scope = {} 753 exec(import_cmd_str, scope) 754 upgrade_device = scope.get("upgrade_device", "None") 755 upgrade = upgrade_device(serial_port=serial_port, baud_rate=baud_rate, 756 patch_file=patch_file, usb_port=usb_port) 757 upgrade_result = upgrade.burn() 758 if upgrade_result: 759 return upgrade.reset_device() 760 return None 761 762 def __teardown__(self, device): 763 pass 764