1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2021 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# 0. 运行环境: python 3.10+, pytest 17# 1. 修改 GP 中字段 18# 2. pytest [-k case_name_pattern] 19# eg. pytest -k file 执行方法名含 file 的用例 20 21# 打包 22# pip install pyinstaller 23# prepare assert source dir includes your data files 24# pyi-makespec -D --add-data assert:assert dev_hdc_test.py 25# pyinstaller dev_hdc_test.spec 26# 执行 dev_hdc_test.exe 27 28import csv 29import hashlib 30import json 31import logging 32import os 33import random 34import re 35import stat 36import shutil 37import subprocess 38import sys 39import threading 40import time 41from multiprocessing import Process 42 43import pytest 44import pkg_resources 45 46 47class GP(object): 48 """ Global Parameters 49 50 customize here !!! 51 """ 52 hdc_exe = "hdc" 53 local_path = "resource" 54 remote_path = "data/local/tmp" 55 remote_dir_path = "data/local/tmp/it_send_dir" 56 remote_ip = "auto" 57 remote_port = 8710 58 hdc_head = "hdc" 59 device_name = "" 60 targets = [] 61 tmode = "usb" 62 changed_testcase = "n" 63 testcase_path = "ts_windows.csv" 64 loaded_testcase = 0 65 debug_app = "com.example.myapplication" 66 67 @classmethod 68 def init(cls): 69 logging.basicConfig(level=logging.INFO, 70 format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s', 71 datefmt='%d %b %Y %H:%M:%S', 72 ) 73 logging.basicConfig(level=logging.WARNING, 74 format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s', 75 datefmt='%d %b %Y %H:%M:%S', 76 ) 77 78 if os.path.exists(".hdctester.conf"): 79 cls.load() 80 cls.start_host() 81 cls.list_targets() 82 else: 83 cls.set_options() 84 cls.print_options() 85 cls.start_host() 86 cls.dump() 87 return 88 89 90 @classmethod 91 def start_host(cls): 92 cmd = f"{cls.hdc_exe} start" 93 res = subprocess.call(cmd.split()) 94 return res 95 96 97 @classmethod 98 def list_targets(cls): 99 try: 100 targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split() 101 except (OSError, IndexError): 102 targets = [b"failed to auto detect device"] 103 cls.targets = [targets[0].decode()] 104 return False 105 cls.targets = [t.decode() for t in targets] 106 return True 107 108 109 @classmethod 110 def get_device(cls): 111 cls.start_host() 112 cls.list_targets() 113 if len(cls.targets) > 1: 114 print("Multiple device detected, please select one:") 115 for i, t in enumerate(cls.targets): 116 print(f"{i+1}. {t}") 117 print("input the nums of the device above:") 118 cls.device_name = cls.targets[int(input()) - 1] 119 else: 120 cls.device_name = cls.targets[0] 121 if cls.device_name == "failed to auto detect device": 122 print("No device detected, please check your device connection") 123 return False 124 elif cls.device_name == "[empty]": 125 print("No hdc device detected.") 126 return False 127 cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}" 128 return True 129 130 131 @classmethod 132 def dump(cls): 133 try: 134 os.remove(".hdctester.conf") 135 except OSError: 136 pass 137 content = filter( 138 lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items()) 139 json_str = json.dumps(dict(content)) 140 fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755) 141 os.write(fd, json_str.encode()) 142 os.close(fd) 143 return True 144 145 146 @classmethod 147 def load(cls): 148 with open(".hdctester.conf") as f: 149 content = json.load(f) 150 cls.hdc_exe = content.get("hdc_exe") 151 cls.local_path = content.get("local_path") 152 cls.remote_path = content.get("remote_path") 153 cls.remote_ip = content.get("remote_ip") 154 cls.hdc_head = content.get("hdc_head") 155 cls.tmode = content.get("tmode") 156 cls.device_name = content.get("device_name") 157 cls.changed_testcase = content.get("changed_testcase") 158 cls.testcase_path = content.get("testcase_path") 159 cls.loaded_testcase = content.get("load_testcase") 160 return True 161 162 163 @classmethod 164 def print_options(cls): 165 info = "HDC Tester Default Options: \n\n" \ 166 + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \ 167 + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \ 168 + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \ 169 + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \ 170 + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \ 171 + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \ 172 + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \ 173 + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \ 174 + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \ 175 + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \ 176 + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n" 177 print(info) 178 179 180 @classmethod 181 def tconn_tcp(cls): 182 res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode() 183 if "Connect OK" in res: 184 return True 185 else: 186 return False 187 188 189 @classmethod 190 def set_options(cls): 191 if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip(): 192 cls.hdc_exe = opt 193 if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip(): 194 cls.local_path = opt 195 if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip(): 196 cls.remote_path = opt 197 if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip(): 198 cls.remote_ip = opt 199 if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip(): 200 cls.remote_port = int(opt) 201 if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip(): 202 cls.device_name = opt 203 if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip(): 204 cls.tmode = opt 205 if cls.tmode == "usb": 206 ret = cls.get_device() 207 if ret: 208 print("USB device detected.") 209 elif cls.tconn_tcp(): 210 cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}" 211 else: 212 print(f"tconn {cls.remote_ip}:{cls.remote_port} failed") 213 return False 214 return True 215 216 217 @classmethod 218 def change_testcase(cls): 219 if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip(): 220 cls.changed_testcase = opt 221 if opt == "n": 222 return False 223 if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip(): 224 cls.testcase_path = os.path.join(opt) 225 cls.print_options() 226 return True 227 228 229 @classmethod 230 def load_testcase(cls): 231 print("this fuction will coming soon.") 232 return False 233 234 @classmethod 235 def get_version(cls): 236 version = f"v1.0.4a" 237 return version 238 239 240def pytest_run(args): 241 file_list = [] 242 file_list.append("entry-default-signed-debug.hap") 243 file_list.append("libA_v10001.hsp") 244 file_list.append("libB_v10001.hsp") 245 for file in file_list: 246 if not os.path.exists(os.path.join(GP.local_path, file)): 247 print(f"No {file} File!") 248 print("请将package.zip中的安装包文件解压到当前脚本resource目录中,操作完成该步骤后重新执行脚本。") 249 print("Please unzip package.zip to resource directory, please rerun after operation.") 250 input("[ENTER]") 251 return 252 gen_package_dir() 253 start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 254 if args.count is not None: 255 for i in range(args.count): 256 print(f"------------The {i}/{args.count} Test-------------") 257 timestamp = time.time() 258 pytest_args = [ 259 '--verbose', args.verbose, 260 '--report=report.html', 261 '--title=HDC Test Report 'f"{GP.get_version()}", 262 '--tester=tester001', 263 '--template=1', 264 '--desc='f"{args.verbose}:{args.desc}" 265 ] 266 pytest.main(pytest_args) 267 end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 268 report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time())) 269 report_dir = os.path.join(os.getcwd(), "reports") 270 report_file = os.path.join(report_dir, f"{report_time}report.html") 271 print(f"Test over, the script version is {GP.get_version()}," 272 f" start at {start_time}, end at {end_time} \n" 273 f"=======>The device hdcd ROM size is {GP.hdcd_rom}.\n" 274 f"=======>{report_file} is saved. \n" 275 ) 276 input("=======>press [Enter] key to Show logs.") 277 278 279def rmdir(path): 280 try: 281 if sys.platform == "win32": 282 if os.path.isfile(path): 283 os.remove(path) 284 else: 285 shutil.rmtree(path) 286 else: 287 subprocess.call(f"rm -rf {path}".split()) 288 except OSError: 289 print(f"Error: {path} : cannot remove") 290 pass 291 292 293def get_local_path(path): 294 return os.path.join(GP.local_path, path) 295 296 297def get_remote_path(path): 298 return f"{GP.remote_path}/{path}" 299 300 301def get_local_md5(local): 302 md5_hash = hashlib.md5() 303 with open(local, "rb") as f: 304 for byte_block in iter(lambda: f.read(4096), b""): 305 md5_hash.update(byte_block) 306 return md5_hash.hexdigest() 307 308 309def check_shell_any_device(cmd, pattern=None, fetch=False): 310 print(f"\nexecuting command: {cmd}") 311 if pattern: # check output valid 312 print("pattern case") 313 output = subprocess.check_output(cmd.split()).decode() 314 res = pattern in output 315 print(f"--> output: {output}") 316 print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") 317 return res, output 318 elif fetch: 319 output = subprocess.check_output(cmd.split()).decode() 320 print(f"--> output: {output}") 321 return True, output 322 else: # check cmd run successfully 323 print("other case") 324 return subprocess.check_call(cmd.split()) == 0, "" 325 326 327def check_shell(cmd, pattern=None, fetch=False, head=None): 328 if head is None: 329 head = GP.hdc_head 330 cmd = f"{head} {cmd}" 331 print(f"\nexecuting command: {cmd}") 332 if pattern: # check output valid 333 output = subprocess.check_output(cmd.split()).decode() 334 res = pattern in output 335 print(f"--> output: {output}") 336 print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") 337 return res 338 elif fetch: 339 output = subprocess.check_output(cmd.split()).decode() 340 print(f"--> output: {output}") 341 return output 342 else: # check cmd run successfully 343 return subprocess.check_call(cmd.split()) == 0 344 345 346def get_shell_result(cmd, pattern=None, fetch=False): 347 cmd = f"{GP.hdc_head} {cmd}" 348 print(f"\nexecuting command: {cmd}") 349 return subprocess.check_output(cmd.split()).decode() 350 351 352def check_rate(cmd, expected_rate): 353 send_result = get_shell_result(cmd) 354 rate = float(send_result.split("rate:")[1].split("kB/s")[0]) 355 return rate > expected_rate 356 357 358def check_dir(local, remote, is_single_dir=False): 359 def _get_md5sum(remote, is_single_dir=False): 360 if is_single_dir: 361 cmd = f"{GP.hdc_head} shell md5sum {remote}/*" 362 else: 363 cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;' 364 result = subprocess.check_output(cmd.split()).decode() 365 return result 366 367 def _calculate_md5(file_path): 368 md5 = hashlib.md5() 369 try: 370 with open(file_path, 'rb') as f: 371 for chunk in iter(lambda: f.read(4096), b""): 372 md5.update(chunk) 373 return md5.hexdigest() 374 except PermissionError: 375 return "PermissionError" 376 except FileNotFoundError: 377 return "FileNotFoundError" 378 print("remote:" + remote) 379 output = _get_md5sum(remote) 380 print(output) 381 382 result = 1 383 for line in output.splitlines(): 384 if len(line) < 32: # length of MD5 385 continue 386 expected_md5, file_name = line.split()[:2] 387 if is_single_dir: 388 file_name = file_name.replace(f"{remote}", "") 389 elif GP.remote_path in remote: 390 file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\") 391 else: 392 file_name = file_name.split(remote)[1].replace("/", "\\") 393 file_path = os.path.join(os.getcwd(), GP.local_path) + file_name # 构建完整的文件路径 394 if is_single_dir: 395 file_path = os.path.join(os.getcwd(), local) + file_name 396 print(file_path) 397 actual_md5 = _calculate_md5(file_path) 398 print(f"Expected: {expected_md5}") 399 print(f"Actual: {actual_md5}") 400 print(f"MD5 matched {file_name}") 401 if actual_md5 != expected_md5: 402 print(f"[Fail]MD5 mismatch for {file_name}") 403 result *= 0 404 405 return (result == 1) 406 407 408def _check_file(local, remote, head=None): 409 if head is None: 410 head = GP.hdc_head 411 if remote.startswith("/proc"): 412 local_size = os.path.getsize(local) 413 if local_size > 0: 414 return True 415 else: 416 return False 417 else: 418 cmd = f"shell md5sum {remote}" 419 local_md5 = get_local_md5(local) 420 return check_shell(cmd, local_md5, head=head) 421 422 423def _check_app_installed(bundle, is_shared=False): 424 dump = "dump-shared" if is_shared else "dump" 425 cmd = f"shell bm {dump} -a" 426 return check_shell(cmd, bundle) 427 428 429def check_hdc_targets(): 430 cmd = f"{GP.hdc_head} list targets" 431 print(GP.device_name) 432 return check_shell(cmd, GP.device_name) 433 434 435def check_file_send(local, remote): 436 local_path = os.path.join(GP.local_path, local) 437 remote_path = f"{GP.remote_path}/{remote}" 438 cmd = f"file send {local_path} {remote_path}" 439 return check_shell(cmd) and _check_file(local_path, remote_path) 440 441 442def check_file_recv(remote, local): 443 local_path = os.path.join(GP.local_path, local) 444 remote_path = f"{GP.remote_path}/{remote}" 445 cmd = f"file recv {remote_path} {local_path}" 446 return check_shell(cmd) and _check_file(local_path, remote_path) 447 448 449def check_app_install(app, bundle, args=""): 450 app = os.path.join(GP.local_path, app) 451 install_cmd = f"install {args} {app}" 452 if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): 453 return check_shell(install_cmd, "failed to install bundle") 454 else: 455 return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args) 456 457 458def check_app_uninstall(bundle, args=""): 459 uninstall_cmd = f"uninstall {args} {bundle}" 460 return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args) 461 462 463def check_app_install_multi(tables, args=""): 464 apps = [] 465 bundles = [] 466 for app, bundle in tables.items() : 467 app = os.path.join(GP.local_path, app) 468 apps.append(app) 469 bundles.append(bundle) 470 471 apps_str = " ".join(apps) 472 install_cmd = f"install {args} {apps_str}" 473 474 if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str)) 475 or (args == "" and 0 == apps_str.count(".hap"))): 476 if not check_shell(install_cmd, "failed to install bundle"): 477 return False 478 else: 479 if not check_shell(install_cmd, "successfully"): 480 return False 481 482 for bundle in bundles: 483 if not _check_app_installed(bundle, "s" in args): 484 return False 485 486 return True 487 488 489def check_app_uninstall_multi(tables, args=""): 490 for app, bundle in tables.items() : 491 if not check_app_uninstall(bundle, args): 492 return False 493 494 return True 495 496 497def check_hdc_cmd(cmd, pattern=None, head=None, is_single_dir=True, **args): 498 if head is None: 499 head = GP.hdc_head 500 if cmd.startswith("file"): 501 if not check_shell(cmd, "FileTransfer finish", head=head): 502 return False 503 if cmd.startswith("file send"): 504 local, remote = cmd.split()[-2:] 505 else: 506 remote, local = cmd.split()[-2:] 507 if "-b" in cmd: 508 mnt_debug_path = "mnt/debug/100/debug_hap/" 509 remote = f"{mnt_debug_path}/{GP.debug_app}/{remote}" 510 if os.path.isfile(local): 511 return _check_file(local, remote, head=head) 512 else: 513 return check_dir(local, remote, is_single_dir=is_single_dir) 514 elif cmd.startswith("install"): 515 bundle = args.get("bundle", "invalid") 516 opt = " ".join(cmd.split()[1:-1]) 517 return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt) 518 519 elif cmd.startswith("uninstall"): 520 bundle = cmd.split()[-1] 521 opt = " ".join(cmd.split()[1:-1]) 522 return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt) 523 524 else: 525 return check_shell(cmd, pattern, head=head, **args) 526 527 528def check_soft_local(local_source, local_soft, remote): 529 cmd = f"file send {local_soft} {remote}" 530 if not check_shell(cmd, "FileTransfer finish"): 531 return False 532 return _check_file(local_source, remote) 533 534 535def check_soft_remote(remote_source, remote_soft, local_recv): 536 check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}") 537 cmd = f"file recv {remote_soft} {local_recv}" 538 if not check_shell(cmd, "FileTransfer finish"): 539 return False 540 return _check_file(local_recv, get_remote_path(remote_source)) 541 542 543def switch_usb(): 544 res = check_hdc_cmd("tmode usb") 545 time.sleep(3) 546 if res: 547 GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}" 548 return res 549 550 551def copy_file(src, dst): 552 try: 553 shutil.copy2(src, dst) 554 print(f"File copied successfully from {src} to {dst}") 555 except IOError as e: 556 print(f"Unable to copy file. {e}") 557 except Exception as e: 558 print(f"Unexpected error: {e}") 559 560 561def switch_tcp(): 562 if not GP.remote_ip: # skip tcp check 563 print("!!! remote_ip is none, skip tcp check !!!") 564 return True 565 if GP.remote_ip == "auto": 566 ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True) 567 if not ipconf: 568 print("!!! device ip not found, skip tcp check !!!") 569 return True 570 GP.remote_ip = ipconf.split(":")[1].split()[0] 571 print(f"fetch remote ip: {GP.remote_ip}") 572 ret = check_hdc_cmd(f"tmode port {GP.remote_port}") 573 if ret: 574 time.sleep(3) 575 res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK") 576 if res: 577 GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}" 578 return res 579 580 581def select_cmd(): 582 msg = "1) Proceed tester\n" \ 583 + "2) Customize tester\n" \ 584 + "3) Setup files for transfer\n" \ 585 + "4) Load custom testcase(default unused) \n" \ 586 + "5) Exit\n" \ 587 + ">> " 588 589 while True: 590 opt = input(msg).strip() 591 if len(opt) == 1 and '1' <= opt <= '5': 592 return opt 593 594 595def gen_file(path, size): 596 index = 0 597 path = os.path.abspath(path) 598 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 599 600 while index < size: 601 os.write(fd, os.urandom(1024)) 602 index += 1024 603 os.close(fd) 604 605 606def gen_zero_file(path, size): 607 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 608 os.write(fd, b'0' * size) 609 os.close(fd) 610 611 612def create_file_with_size(path, size): 613 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 614 os.write(fd, b'\0' * size) 615 os.close(fd) 616 617 618def gen_file_set(): 619 print("generating empty file ...") 620 gen_file(os.path.join(GP.local_path, "empty"), 0) 621 622 print("generating small file ...") 623 gen_file(os.path.join(GP.local_path, "small"), 102400) 624 625 print("generating medium file ...") 626 gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2) 627 628 print("generating large file ...") 629 gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3) 630 631 print("generating soft link ...") 632 try: 633 os.symlink("small", os.path.join(GP.local_path, "soft_small")) 634 except FileExistsError: 635 print("soft_small already exists") 636 637 print("generating text file ...") 638 gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2) 639 640 print("generating package dir ...") 641 if not os.path.exists(os.path.join(GP.local_path, "package")): 642 os.mkdir(os.path.join(GP.local_path, "package")) 643 for i in range(1, 6): 644 gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2) 645 646 print("generating deep dir ...") 647 deepth = 4 648 deep_path = os.path.join(GP.local_path, "deep_dir") 649 if not os.path.exists(deep_path): 650 os.mkdir(deep_path) 651 for deep in range(deepth): 652 deep_path = os.path.join(deep_path, f"deep_dir{deep}") 653 if not os.path.exists(deep_path): 654 os.mkdir(deep_path) 655 gen_file(os.path.join(deep_path, "deep"), 102400) 656 657 print("generating dir with file ...") 658 dir_path = os.path.join(GP.local_path, "problem_dir") 659 rmdir(dir_path) 660 os.mkdir(dir_path) 661 gen_file(os.path.join(dir_path, "small2"), 102400) 662 663 fuzz_count = 47 # 47 is the count that circulated the file transfer 664 data_unit = 1024 # 1024 is the size that circulated the file transfer 665 data_extra = 936 # 936 is the size that cased the extra file transfer 666 for i in range(fuzz_count): 667 create_file_with_size( 668 os.path.join( 669 dir_path, f"file_{i * data_unit+data_extra}.dat" 670 ), i * data_unit + data_extra 671 ) 672 673 print("generating empty dir ...") 674 dir_path = os.path.join(GP.local_path, "empty_dir") 675 rmdir(dir_path) 676 os.mkdir(dir_path) 677 678 print("generating version file ...") 679 gen_file(os.path.join(GP.local_path, GP.get_version()), 0) 680 681 682def gen_package_dir(): 683 print("generating app dir ...") 684 dir_path = os.path.join(GP.local_path, "app_dir") 685 rmdir(dir_path) 686 os.mkdir(dir_path) 687 app = os.path.join(GP.local_path, "entry-default-signed-debug.hap") 688 dst_dir = os.path.join(GP.local_path, "app_dir") 689 if not os.path.exists(app): 690 print(f"Source file {app} does not exist.") 691 else: 692 copy_file(app, dst_dir) 693 694 695def prepare_source(): 696 if os.path.exists(os.path.join(GP.local_path, GP.get_version())): 697 print(f"hdc test version is {GP.get_version}, check ok, skip prepare.") 698 return 699 print(f"in prepare {GP.local_path},wait for 2 mins.") 700 current_path = os.getcwd() 701 702 if os.path.exists(GP.local_path): 703 # 打开local_path遍历其中的文件,删除hap hsp以外的所有文件 704 for file in os.listdir(GP.local_path): 705 if file.endswith(".hap") or file.endswith(".hsp"): 706 continue 707 file_path = os.path.join(GP.local_path, file) 708 rmdir(file_path) 709 else: 710 os.mkdir(GP.local_path) 711 712 gen_file_set() 713 714 715def add_prepare_source(): 716 deep_path = os.path.join(GP.local_path, "deep_test_dir") 717 print("generating deep test dir ...") 718 absolute_path = os.path.abspath(__file__) 719 deepth = (255 - 9 - len(absolute_path)) % 14 720 os.mkdir(deep_path) 721 for deep in range(deepth): 722 deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") 723 os.mkdir(deep_path) 724 gen_file(os.path.join(deep_path, "deep_test"), 102400) 725 726 recv_dir = os.path.join(GP.local_path, "recv_test_dir") 727 print("generating recv test dir ...") 728 os.mkdir(recv_dir) 729 730 731def update_source(): 732 deep_path = os.path.join(GP.local_path, "deep_test_dir") 733 if not os.path.exists(deep_path): 734 print("generating deep test dir ...") 735 absolute_path = os.path.abspath(__file__) 736 deepth = (255 - 9 - len(absolute_path)) % 14 737 os.mkdir(deep_path) 738 for deep in range(deepth): 739 deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") 740 os.mkdir(deep_path) 741 gen_file(os.path.join(deep_path, "deep_test"), 102400) 742 743 recv_dir = os.path.join(GP.local_path, "recv_test_dir") 744 if not os.path.exists(recv_dir): 745 print("generating recv test dir ...") 746 os.mkdir(recv_dir) 747 748 749def setup_tester(): 750 while True: 751 GP.print_options() 752 opt = int(select_cmd()) 753 if opt == 1: 754 return True 755 elif opt == 2: 756 if not GP.set_options(): 757 return False 758 GP.dump() 759 elif opt == 3: 760 prepare_source() 761 elif opt == 4: 762 if not GP.load_testcase(): 763 return False 764 elif opt == 5: 765 return False 766 else: 767 return False 768 769 770def load_testcase(): 771 if not GP.load_testcase: 772 print("load testcase failed") 773 return False 774 print("load testcase success") 775 return True 776 777 778def check_library_installation(library_name): 779 try: 780 pkg_resources.get_distribution(library_name) 781 return 0 782 except pkg_resources.DistributionNotFound: 783 print(f"\n\n{library_name} is not installed.\n\n") 784 print(f"try to use command below:") 785 print(f"pip install {library_name}") 786 return 1 787 788 789def check_subprocess_cmd(cmd, process_num, timeout): 790 791 for i in range(process_num): 792 p = subprocess.Popen(cmd.split()) 793 try: 794 p.wait(timeout=5) 795 except subprocess.TimeoutExpired: 796 p.kill() 797 798 799def check_process_mixed(process_num, timeout, local, remote): 800 multi_num = process_num 801 list_send = [] 802 list_recv = [] 803 sizes = {"small", "medium", "empty"} 804 for i in range(multi_num): 805 for size in sizes: 806 cmd_send = f"file send {get_local_path(f'{size}')} {get_remote_path(f'it_{size}_mix_{i}')}" 807 cmd_recv = f"file recv {get_remote_path(f'it_{size}_mix_{i}')} {get_local_path(f'recv_{size}_mix_{i}')}" 808 list_send.append(Process(target=check_hdc_cmd, args=(cmd_send, ))) 809 list_recv.append(Process(target=check_hdc_cmd, args=(cmd_recv, ))) 810 logging.info(f"RESULT:{cmd_send}") # 打印命令的输出 811 for send in list_send: 812 wait_time = random.uniform(0, 1) 813 send.start() 814 time.sleep(wait_time) 815 for send in list_send: 816 send.join() 817 818 for recv in list_recv: 819 wait_time = random.uniform(0, 1) 820 recv.start() 821 time.sleep(wait_time) 822 for recv in list_recv: 823 recv.join() 824 wait_time = random.uniform(0, 1) 825 time.sleep(wait_time) 826 827 828def execute_lines_in_file(file_path): 829 if not os.path.exists(file_path): 830 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 831 modes = stat.S_IWUSR | stat.S_IRUSR 832 with os.fdopen(os.open(file_path, flags, modes), 'w') as file: 833 file.write("1,hdc shell ls") 834 with open(file_path, 'r') as file: 835 lines = file.readlines() 836 for line in lines: 837 test_time = line.split(',')[0] 838 test_cmd = line.split(',')[1] 839 pattern = r"^hdc" 840 match = re.search(pattern, test_cmd) 841 if match: 842 result = test_cmd.replace(match.group(0), "").lstrip() 843 test_cmd = f"{GP.hdc_head} {result}" 844 845 for i in range(int(test_time)): 846 logging.info(f"THE {i+1}/{test_time} TEST,COMMAND IS:{test_cmd}") 847 output = subprocess.check_output(test_cmd.split()).decode() 848 logging.info(f"RESULT:{output}") # 打印命令的输出 849 850 851def make_multiprocess_file(local, remote, mode, num, task_type): 852 if num < 1: 853 return False 854 if task_type == "file": 855 if mode == "send" : 856 file_list = [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)] 857 elif mode == "recv": 858 file_list = [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)] 859 else: 860 return False 861 if task_type == "dir": 862 if mode == "send" : 863 file_list = [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)] 864 elif mode == "recv": 865 file_list = [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)] 866 else: 867 return False 868 print(file_list[0]) 869 p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in file_list] 870 print(f"{mode} target {num} start") 871 while(len(p_list)): 872 for p in p_list: 873 if p.poll() is not None: 874 stdout, stderr = p.communicate(timeout=512) # timeout wait 512s 875 if stderr: 876 print(f"{stderr.decode()}") 877 if stdout: 878 print(f"{stdout.decode()}") 879 if stdout.decode().find("FileTransfer finish") == -1: 880 return False 881 p_list.remove(p) 882 res = 1 883 if task_type == "file": 884 for i in range(num): 885 if mode == "send": 886 if _check_file(local, f"{remote}_{i}"): 887 res *= 1 888 else: 889 res *= 0 890 elif mode == "recv": 891 if _check_file(f"{local}_{i}", f"{remote}_{i}"): 892 res *= 1 893 else: 894 res *= 0 895 if task_type == "dir": 896 for _ in range(num): 897 if mode == "send": 898 end_of_file_name = os.path.basename(local) 899 if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True): 900 res *= 1 901 else: 902 res *= 0 903 elif mode == "recv": 904 end_of_file_name = os.path.basename(remote) 905 local = os.path.join(local, end_of_file_name) 906 if check_dir(f"{local}", f"{remote}", is_single_dir=True): 907 res *= 1 908 else: 909 res *= 0 910 return res == 1 911 912 913def hdc_get_key(cmd): 914 test_cmd = f"{GP.hdc_head} {cmd}" 915 result = subprocess.check_output(test_cmd.split()).decode() 916 return result 917 918 919def start_subprocess_cmd(cmd, num, assert_out): 920 if num < 1: 921 return False 922 cmd_list = [f"{GP.hdc_head} {cmd}" for _ in range(num)] 923 p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmd_list] 924 logging.info(f"{cmd} target {num} start") 925 while(len(p_list)): 926 for p in p_list: 927 if p.poll() is not None: 928 stdout, stderr = p.communicate(timeout=512) 929 if stderr: 930 logging.error(f"{stderr.decode()}") 931 if stdout: 932 logging.info(f"{stdout.decode()}") 933 if assert_out is not None and stdout.decode().find(assert_out) == -1: 934 return False 935 p_list.remove(p) 936 return True 937 938 939def check_hdc_version(cmd, version): 940 941 def _convert_version_to_hex(_version): 942 parts = _version.split("Ver: ")[1].split('.') 943 hex_version = ''.join(parts) 944 return int(hex_version, 16) 945 946 expected_version = _convert_version_to_hex(version) 947 cmd = f"{GP.hdc_head} -v" 948 print(f"\nexecuting command: {cmd}") 949 if version is not None: # check output valid 950 output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "") 951 real_version = _convert_version_to_hex(output) 952 print(f"--> output: {output}") 953 print(f"--> your local [{version}] is" 954 f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]" 955 ) 956 return expected_version <= real_version 957 958 959def check_cmd_time(cmd, pattern, duration, times): 960 if times < 1: 961 print("times should be bigger than 0.") 962 return False 963 if pattern is None: 964 fetchable = True 965 else: 966 fetchable = False 967 start_time = time.time() * 1000 968 print(f"{cmd} start {start_time}") 969 res = [] 970 for i in range(times): 971 start_in = time.time() * 1000 972 if pattern is None: 973 subprocess.check_output(f"{GP.hdc_head} {cmd}".split()) 974 else: 975 assert check_shell(cmd, pattern, fetch=fetchable) 976 start_out = time.time() * 1000 977 res.append(start_out - start_in) 978 979 # 计算最大值、最小值和中位数 980 max_value = max(res) 981 min_value = min(res) 982 median_value = sorted(res)[len(res) // 2] 983 984 print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}") 985 print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}") 986 print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}") 987 988 end_time = time.time() * 1000 989 990 try: 991 timecost = int(end_time - start_time) / times 992 print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}") 993 except ZeroDivisionError: 994 print(f"除数为0") 995 996 if duration is None: 997 duration = 150 * 1.2 998 # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status 999 return timecost < duration 1000 1001 1002def check_rom(baseline): 1003 1004 def _try_get_size(message): 1005 try: 1006 size = int(message.split('\t')[0]) 1007 except ValueError: 1008 size = -9999 * 1024 # error size 1009 print(f"try get size value error, from {message}") 1010 return size 1011 1012 if baseline is None: 1013 baseline = 2200 1014 # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together 1015 cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd" 1016 result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode() 1017 hdcd_size = _try_get_size(result_hdcd) 1018 cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so" 1019 result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode() 1020 if "directory" in result_libhdc: 1021 cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so" 1022 result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode() 1023 if "directory" in result_libhdc64: 1024 libhdc_size = 0 1025 else: 1026 libhdc_size = _try_get_size(result_libhdc64) 1027 else: 1028 libhdc_size = _try_get_size(result_libhdc) 1029 all_size = hdcd_size + libhdc_size 1030 GP.hdcd_rom = all_size 1031 if all_size < 0: 1032 GP.hdcd_rom = "error" 1033 return False 1034 else: 1035 GP.hdcd_rom = f"{all_size} KB" 1036 if all_size > baseline: 1037 print(f"rom size is {all_size}, overlimit baseline {baseline}") 1038 return False 1039 else: 1040 print(f"rom size is {all_size}, underlimit baseline {baseline}") 1041 return True 1042 1043 1044def run_command_with_timeout(command, timeout): 1045 try: 1046 result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) 1047 return result.stdout.decode(), result.stderr.decode() 1048 except subprocess.TimeoutExpired: 1049 return None, "Command timed out" 1050 except subprocess.CalledProcessError as e: 1051 return None, e.stderr.decode() 1052 1053 1054def check_cmd_block(command, pattern, timeout=600): 1055 # 启动子进程 1056 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 1057 1058 # 用于存储子进程的输出 1059 output = "" 1060 1061 try: 1062 # 读取子进程的输出 1063 output, _ = process.communicate(timeout=timeout) 1064 except subprocess.TimeoutExpired: 1065 process.terminate() 1066 process.kill() 1067 output, _ = process.communicate(timeout=timeout) 1068 1069 print(f"--> output: {output}") 1070 if pattern in output: 1071 return True 1072 else: 1073 return False 1074