1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2025 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 运行环境: python 3.10+, pytest 16 17import hashlib 18import json 19import os 20import re 21import shutil 22import subprocess 23import sys 24import time 25import functools 26 27import pytest 28import importlib 29 30 31class GP(object): 32 """ Global Parameters 33 34 customize here !!! 35 """ 36 hdc_exe = "hdc" 37 local_path = "resource" 38 remote_path = "data/local/tmp" 39 remote_dir_path = "data/local/tmp/it_send_dir" 40 remote_ip = "auto" 41 remote_port = 8710 42 hdc_head = "hdc" 43 device_name = "" 44 targets = [] 45 tmode = "usb" 46 changed_testcase = "n" 47 testcase_path = "ts_windows.csv" 48 loaded_testcase = 0 49 hdcd_rom = "not checked" 50 debug_app = "com.example.myapplication" 51 52 @classmethod 53 def init(cls): 54 if os.path.exists(".hdctester.conf"): 55 cls.load() 56 cls.start_host() 57 cls.list_targets() 58 else: 59 cls.set_options() 60 cls.print_options() 61 cls.start_host() 62 cls.dump() 63 return 64 65 66 @classmethod 67 def start_host(cls): 68 cmd = f"{cls.hdc_exe} start" 69 res = subprocess.call(cmd.split()) 70 return res 71 72 @classmethod 73 def list_targets(cls): 74 try: 75 targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split() 76 except (OSError, IndexError): 77 targets = [b"failed to auto detect device"] 78 cls.targets = [targets[0].decode()] 79 return False 80 cls.targets = [t.decode() for t in targets] 81 return True 82 83 84 @classmethod 85 def get_device(cls): 86 cls.start_host() 87 cls.list_targets() 88 if len(cls.targets) > 1: 89 print("Multiple device detected, please select one:") 90 for i, t in enumerate(cls.targets): 91 print(f"{i+1}. {t}") 92 print("input the nums of the device above:") 93 cls.device_name = cls.targets[int(input()) - 1] 94 else: 95 cls.device_name = cls.targets[0] 96 if cls.device_name == "failed to auto detect device": 97 print("No device detected, please check your device connection") 98 return False 99 elif cls.device_name == "[empty]": 100 print("No hdc device detected.") 101 return False 102 cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}" 103 return True 104 105 106 @classmethod 107 def dump(cls): 108 try: 109 os.remove(".hdctester.conf") 110 except OSError: 111 pass 112 content = filter( 113 lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items()) 114 json_str = json.dumps(dict(content)) 115 fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755) 116 os.write(fd, json_str.encode()) 117 os.close(fd) 118 return True 119 120 121 @classmethod 122 def load(cls): 123 if not os.path.exists(".hdctester.conf"): 124 raise ConfigFileNotFoundException("No config file found, please run command [python prepare.py]") 125 with open(".hdctester.conf") as f: 126 content = json.load(f) 127 cls.hdc_exe = content.get("hdc_exe") 128 cls.local_path = content.get("local_path") 129 cls.remote_path = content.get("remote_path") 130 cls.remote_ip = content.get("remote_ip") 131 cls.hdc_head = content.get("hdc_head") 132 cls.tmode = content.get("tmode") 133 cls.device_name = content.get("device_name") 134 cls.changed_testcase = content.get("changed_testcase") 135 cls.testcase_path = content.get("testcase_path") 136 cls.loaded_testcase = content.get("load_testcase") 137 return True 138 139 140 @classmethod 141 def print_options(cls): 142 info = "HDC Tester Default Options: \n\n" \ 143 + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \ 144 + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \ 145 + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \ 146 + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \ 147 + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \ 148 + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \ 149 + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \ 150 + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \ 151 + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \ 152 + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \ 153 + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n" 154 print(info) 155 156 157 @classmethod 158 def tconn_tcp(cls): 159 res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode() 160 if "Connect OK" in res: 161 return True 162 else: 163 return False 164 165 166 @classmethod 167 def set_options(cls): 168 if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip(): 169 cls.hdc_exe = opt 170 if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip(): 171 cls.local_path = opt 172 if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip(): 173 cls.remote_path = opt 174 if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip(): 175 cls.remote_ip = opt 176 if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip(): 177 cls.remote_port = int(opt) 178 if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip(): 179 cls.device_name = opt 180 if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip(): 181 cls.tmode = opt 182 if cls.tmode == "usb": 183 ret = cls.get_device() 184 if ret: 185 print("USB device detected.") 186 elif cls.tconn_tcp(): 187 cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}" 188 else: 189 print(f"tconn {cls.remote_ip}:{cls.remote_port} failed") 190 return False 191 return True 192 193 194 @classmethod 195 def change_testcase(cls): 196 if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip(): 197 cls.changed_testcase = opt 198 if opt == "n": 199 return False 200 if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip(): 201 cls.testcase_path = os.path.join(opt) 202 cls.print_options() 203 return True 204 205 206 @classmethod 207 def load_testcase(cls): 208 print("this fuction will coming soon.") 209 return False 210 211 @classmethod 212 def get_version(cls): 213 version = f"v1.0.5a" 214 return version 215 216 217def pytest_run(): 218 start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 219 pytest.main() 220 end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 221 report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time())) 222 report_dir = os.path.join(os.getcwd(), "reports") 223 report_file = os.path.join(report_dir, f"{report_time}report.html") 224 print(f"Test over, the script version is {GP.get_version()}," 225 f" start at {start_time}, end at {end_time} \n" 226 f"=======>{report_file} is saved. \n" 227 ) 228 input("=======>press [Enter] key to Show logs.") 229 230 231def rmdir(path): 232 try: 233 if sys.platform == "win32": 234 if os.path.isfile(path): 235 os.remove(path) 236 else: 237 shutil.rmtree(path) 238 else: 239 subprocess.call(f"rm -rf {path}".split()) 240 except OSError: 241 print(f"Error: {path} : cannot remove") 242 pass 243 244 245def get_local_path(path): 246 return os.path.join(GP.local_path, path) 247 248 249def get_remote_path(path): 250 return f"{GP.remote_path}/{path}" 251 252 253def get_local_md5(local): 254 md5_hash = hashlib.md5() 255 with open(local, "rb") as f: 256 for byte_block in iter(lambda: f.read(4096), b""): 257 md5_hash.update(byte_block) 258 return md5_hash.hexdigest() 259 260 261def check_shell_any_device(cmd, pattern=None, fetch=False): 262 print(f"\nexecuting command: {cmd}") 263 if pattern: # check output valid 264 print("pattern case") 265 output = subprocess.check_output(cmd.split()).decode() 266 res = pattern in output 267 print(f"--> output: {output}") 268 print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") 269 return res, output 270 elif fetch: 271 output = subprocess.check_output(cmd.split()).decode() 272 print(f"--> output: {output}") 273 return True, output 274 else: # check cmd run successfully 275 print("other case") 276 return subprocess.check_call(cmd.split()) == 0, "" 277 278 279def check_shell(cmd, pattern=None, fetch=False, head=None): 280 if head is None: 281 head = GP.hdc_head 282 cmd = f"{head} {cmd}" 283 print(f"\nexecuting command: {cmd}") 284 if pattern: # check output valid 285 output = subprocess.check_output(cmd.split()).decode() 286 res = pattern in output 287 print(f"--> output: {output}") 288 print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") 289 return res 290 elif fetch: 291 output = subprocess.check_output(cmd.split()).decode() 292 print(f"--> output: {output}") 293 return output 294 else: # check cmd run successfully 295 return subprocess.check_call(cmd.split()) == 0 296 297 298def get_shell_result(cmd, pattern=None, fetch=False): 299 cmd = f"{GP.hdc_head} {cmd}" 300 print(f"\nexecuting command: {cmd}") 301 return subprocess.check_output(cmd.split()).decode() 302 303 304def check_rate(cmd, expected_rate): 305 send_result = get_shell_result(cmd) 306 rate = float(send_result.split("rate:")[1].split("kB/s")[0]) 307 return rate > expected_rate 308 309 310def check_dir(local, remote, is_single_dir=False): 311 def _get_md5sum(remote, is_single_dir=False): 312 if is_single_dir: 313 cmd = f"{GP.hdc_head} shell md5sum {remote}/*" 314 else: 315 cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}}' 316 result = subprocess.check_output(cmd.split()).decode() 317 return result 318 319 def _calculate_md5(file_path): 320 md5 = hashlib.md5() 321 try: 322 with open(file_path, 'rb') as f: 323 for chunk in iter(lambda: f.read(4096), b""): 324 md5.update(chunk) 325 return md5.hexdigest() 326 except PermissionError: 327 return "PermissionError" 328 except FileNotFoundError: 329 return "FileNotFoundError" 330 print("remote:" + remote) 331 output = _get_md5sum(remote) 332 print(output) 333 334 result = 1 335 for line in output.splitlines(): 336 if len(line) < 32: # length of MD5 337 continue 338 expected_md5, file_name = line.split()[:2] 339 if is_single_dir: 340 file_name = file_name.replace(f"{remote}", "") 341 elif GP.remote_path in remote: 342 file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\") 343 else: 344 file_name = file_name.split(remote)[1].replace("/", "\\") 345 file_path = os.path.join(os.getcwd(), GP.local_path) + file_name # 构建完整的文件路径 346 if is_single_dir: 347 file_path = os.path.join(os.getcwd(), local) + file_name 348 print(file_path) 349 actual_md5 = _calculate_md5(file_path) 350 print(f"Expected: {expected_md5}") 351 print(f"Actual: {actual_md5}") 352 print(f"MD5 matched {file_name}") 353 if actual_md5 != expected_md5: 354 print(f"[Fail]MD5 mismatch for {file_name}") 355 result *= 0 356 357 return (result == 1) 358 359 360def _check_file(local, remote, head=None): 361 if head is None: 362 head = GP.hdc_head 363 if remote.startswith("/proc"): 364 local_size = os.path.getsize(local) 365 if local_size > 0: 366 return True 367 else: 368 return False 369 else: 370 cmd = f"shell md5sum {remote}" 371 local_md5 = get_local_md5(local) 372 return check_shell(cmd, local_md5, head=head) 373 374 375def _check_app_installed(bundle, is_shared=False): 376 dump = "dump-shared" if is_shared else "dump" 377 cmd = f"shell bm {dump} -a" 378 return check_shell(cmd, bundle) 379 380 381def check_hdc_targets(): 382 cmd = f"{GP.hdc_head} list targets" 383 print(GP.device_name) 384 return check_shell(cmd, GP.device_name) 385 386 387def check_file_send(local, remote): 388 local_path = os.path.join(GP.local_path, local) 389 remote_path = f"{GP.remote_path}/{remote}" 390 cmd = f"file send {local_path} {remote_path}" 391 return check_shell(cmd) and _check_file(local_path, remote_path) 392 393 394def check_file_recv(remote, local): 395 local_path = os.path.join(GP.local_path, local) 396 remote_path = f"{GP.remote_path}/{remote}" 397 cmd = f"file recv {remote_path} {local_path}" 398 return check_shell(cmd) and _check_file(local_path, remote_path) 399 400 401def check_app_install(app, bundle, args=""): 402 app = os.path.join(GP.local_path, app) 403 install_cmd = f"install {args} {app}" 404 if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): 405 return check_shell(install_cmd, "failed to install bundle") 406 else: 407 return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args) 408 409 410def check_app_uninstall(bundle, args=""): 411 uninstall_cmd = f"uninstall {args} {bundle}" 412 return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args) 413 414 415def check_app_install_multi(tables, args=""): 416 apps = [] 417 bundles = [] 418 for app, bundle in tables.items() : 419 app = os.path.join(GP.local_path, app) 420 apps.append(app) 421 bundles.append(bundle) 422 423 apps_str = " ".join(apps) 424 install_cmd = f"install {args} {apps_str}" 425 426 if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str)) 427 or (args == "" and 0 == apps_str.count(".hap"))): 428 if not check_shell(install_cmd, "failed to install bundle"): 429 return False 430 else: 431 if not check_shell(install_cmd, "successfully"): 432 return False 433 434 for bundle in bundles: 435 if not _check_app_installed(bundle, "s" in args): 436 return False 437 438 return True 439 440 441def check_app_uninstall_multi(tables, args=""): 442 for app, bundle in tables.items() : 443 if not check_app_uninstall(bundle, args): 444 return False 445 446 return True 447 448 449def check_hdc_cmd(cmd, pattern=None, head=None, is_single_dir=True, **args): 450 if head is None: 451 head = GP.hdc_head 452 if cmd.startswith("file"): 453 if not check_shell(cmd, "FileTransfer finish", head=head): 454 return False 455 if cmd.startswith("file send"): 456 local, remote = cmd.split()[-2:] 457 if remote[-1] == '/' or remote[-1] == '\\': 458 remote = f"{remote}{os.path.basename(local)}" 459 else: 460 remote, local = cmd.split()[-2:] 461 if local[-1] == '/' or local[-1] == '\\': 462 local = f"{local}{os.path.basename(remote)}" 463 if "-b" in cmd: 464 mnt_debug_path = "mnt/debug/100/debug_hap/" 465 remote = f"{mnt_debug_path}/{GP.debug_app}/{remote}" 466 if os.path.isfile(local): 467 return _check_file(local, remote, head=head) 468 else: 469 return check_dir(local, remote, is_single_dir=is_single_dir) 470 elif cmd.startswith("install"): 471 bundle = args.get("bundle", "invalid") 472 opt = " ".join(cmd.split()[1:-1]) 473 return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt) 474 475 elif cmd.startswith("uninstall"): 476 bundle = cmd.split()[-1] 477 opt = " ".join(cmd.split()[1:-1]) 478 return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt) 479 480 else: 481 return check_shell(cmd, pattern, head=head, **args) 482 483 484def check_soft_local(local_source, local_soft, remote): 485 cmd = f"file send {local_soft} {remote}" 486 if not check_shell(cmd, "FileTransfer finish"): 487 return False 488 return _check_file(local_source, remote) 489 490 491def check_soft_remote(remote_source, remote_soft, local_recv): 492 check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}") 493 cmd = f"file recv {remote_soft} {local_recv}" 494 if not check_shell(cmd, "FileTransfer finish"): 495 return False 496 return _check_file(local_recv, get_remote_path(remote_source)) 497 498 499def switch_usb(): 500 res = check_hdc_cmd("tmode usb") 501 time.sleep(3) 502 if res: 503 GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}" 504 return res 505 506 507def copy_file(src, dst): 508 try: 509 shutil.copy2(src, dst) 510 print(f"File copied successfully from {src} to {dst}") 511 except IOError as e: 512 print(f"Unable to copy file. {e}") 513 except Exception as e: 514 print(f"Unexpected error: {e}") 515 516 517def switch_tcp(): 518 if not GP.remote_ip: # skip tcp check 519 print("!!! remote_ip is none, skip tcp check !!!") 520 return True 521 if GP.remote_ip == "auto": 522 ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True) 523 if not ipconf: 524 print("!!! device ip not found, skip tcp check !!!") 525 return True 526 GP.remote_ip = ipconf.split(":")[1].split()[0] 527 print(f"fetch remote ip: {GP.remote_ip}") 528 ret = check_hdc_cmd(f"tmode port {GP.remote_port}") 529 if ret: 530 time.sleep(3) 531 res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK") 532 if res: 533 GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}" 534 return res 535 536 537def select_cmd(): 538 msg = "1) Proceed tester\n" \ 539 + "2) Customize tester\n" \ 540 + "3) Setup files for transfer\n" \ 541 + "4) Load custom testcase(default unused) \n" \ 542 + "5) Exit\n" \ 543 + ">> " 544 545 while True: 546 opt = input(msg).strip() 547 if len(opt) == 1 and '1' <= opt <= '5': 548 return opt 549 550 551def gen_file(path, size): 552 index = 0 553 path = os.path.abspath(path) 554 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 555 556 while index < size: 557 os.write(fd, os.urandom(1024)) 558 index += 1024 559 os.close(fd) 560 561 562def gen_version_file(path): 563 with open(path, "w") as f: 564 f.write(GP.get_version()) 565 566 567def gen_zero_file(path, size): 568 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 569 os.write(fd, b'0' * size) 570 os.close(fd) 571 572 573def create_file_with_size(path, size): 574 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 575 os.write(fd, b'\0' * size) 576 os.close(fd) 577 578 579def gen_soft_link(): 580 print("generating soft link ...") 581 try: 582 os.symlink("small", os.path.join(GP.local_path, "soft_small")) 583 except FileExistsError: 584 print("soft_small already exists") 585 except OSError: 586 print("生成soft_small失败,需要使用管理员权限用户执行软链接生成") 587 588 589def gen_file_set(): 590 print("generating empty file ...") 591 gen_file(os.path.join(GP.local_path, "empty"), 0) 592 593 print("generating small file ...") 594 gen_file(os.path.join(GP.local_path, "small"), 102400) 595 596 print("generating medium file ...") 597 gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2) 598 599 print("generating large file ...") 600 gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3) 601 602 print("generating text file ...") 603 gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2) 604 605 gen_soft_link() 606 print("generating package dir ...") 607 if not os.path.exists(os.path.join(GP.local_path, "package")): 608 os.mkdir(os.path.join(GP.local_path, "package")) 609 for i in range(1, 6): 610 gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2) 611 612 print("generating deep dir ...") 613 deepth = 4 614 deep_path = os.path.join(GP.local_path, "deep_dir") 615 if not os.path.exists(deep_path): 616 os.mkdir(deep_path) 617 for deep in range(deepth): 618 deep_path = os.path.join(deep_path, f"deep_dir{deep}") 619 if not os.path.exists(deep_path): 620 os.mkdir(deep_path) 621 gen_file(os.path.join(deep_path, "deep"), 102400) 622 623 print("generating dir with file ...") 624 dir_path = os.path.join(GP.local_path, "problem_dir") 625 rmdir(dir_path) 626 os.mkdir(dir_path) 627 gen_file(os.path.join(dir_path, "small2"), 102400) 628 629 fuzz_count = 47 # 47 is the count that circulated the file transfer 630 data_unit = 1024 # 1024 is the size that circulated the file transfer 631 data_extra = 936 # 936 is the size that cased the extra file transfer 632 for i in range(fuzz_count): 633 create_file_with_size( 634 os.path.join(dir_path, f"file_{i * data_unit+data_extra}.dat"), i * data_unit + data_extra) 635 print("generating empty dir ...") 636 dir_path = os.path.join(GP.local_path, "empty_dir") 637 rmdir(dir_path) 638 os.mkdir(dir_path) 639 print("generating version file ...") 640 gen_version_file(os.path.join(GP.local_path, "version")) 641 642 643def gen_package_dir(): 644 print("generating app dir ...") 645 dir_path = os.path.join(GP.local_path, "app_dir") 646 if os.path.exists(dir_path): 647 rmdir(dir_path) 648 os.mkdir(dir_path) 649 app = os.path.join(GP.local_path, "AACommand07.hap") 650 dst_dir = os.path.join(GP.local_path, "app_dir") 651 if not os.path.exists(app): 652 print(f"Source file {app} does not exist.") 653 else: 654 copy_file(app, dst_dir) 655 656 657def prepare_source(): 658 version_file = os.path.join(GP.local_path, "version") 659 if os.path.exists(version_file): 660 with open(version_file, "r") as f: 661 version = f.read() 662 if version == GP.get_version(): 663 print(f"hdc test version is {GP.get_version()}, check ok, skip prepare.") 664 return 665 print(f"in prepare {GP.local_path},wait for 2 mins.") 666 current_path = os.getcwd() 667 668 if os.path.exists(GP.local_path): 669 #打开local_path遍历其中的文件,删除hap hsp以外的所有文件 670 for file in os.listdir(GP.local_path): 671 if file.endswith(".hap") or file.endswith(".hsp"): 672 continue 673 file_path = os.path.join(GP.local_path, file) 674 rmdir(file_path) 675 else: 676 os.mkdir(GP.local_path) 677 678 gen_file_set() 679 680 681def add_prepare_source(): 682 deep_path = os.path.join(GP.local_path, "deep_test_dir") 683 print("generating deep test dir ...") 684 absolute_path = os.path.abspath(__file__) 685 deepth = (255 - 9 - len(absolute_path)) % 14 686 os.mkdir(deep_path) 687 for deep in range(deepth): 688 deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") 689 os.mkdir(deep_path) 690 gen_file(os.path.join(deep_path, "deep_test"), 102400) 691 692 recv_dir = os.path.join(GP.local_path, "recv_test_dir") 693 print("generating recv test dir ...") 694 os.mkdir(recv_dir) 695 696 697def update_source(): 698 deep_path = os.path.join(GP.local_path, "deep_test_dir") 699 if not os.path.exists(deep_path): 700 print("generating deep test dir ...") 701 absolute_path = os.path.abspath(__file__) 702 deepth = (255 - 9 - len(absolute_path)) % 14 703 os.mkdir(deep_path) 704 for deep in range(deepth): 705 deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") 706 os.mkdir(deep_path) 707 gen_file(os.path.join(deep_path, "deep_test"), 102400) 708 709 recv_dir = os.path.join(GP.local_path, "recv_test_dir") 710 if not os.path.exists(recv_dir): 711 print("generating recv test dir ...") 712 os.mkdir(recv_dir) 713 714 715def load_testcase(): 716 if not GP.load_testcase: 717 print("load testcase failed") 718 return False 719 print("load testcase success") 720 return True 721 722 723def check_library_installation(library_name): 724 try: 725 importlib.metadata.version(library_name) 726 return 0 727 except importlib.metadata.PackageNotFoundError: 728 print(f"\n\n{library_name} is not installed.\n\n") 729 print(f"try to use command below:") 730 print(f"pip install {library_name}") 731 return 1 732 733 734def check_subprocess_cmd(cmd, process_num, timeout): 735 736 for i in range(process_num): 737 p = subprocess.Popen(cmd.split()) 738 try: 739 p.wait(timeout=5) 740 except subprocess.TimeoutExpired: 741 p.kill() 742 743 744def create_file_commands(local, remote, mode, num): 745 if mode == "send": 746 return [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)] 747 elif mode == "recv": 748 return [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)] 749 else: 750 return [] 751 752 753def create_dir_commands(local, remote, mode, num): 754 if mode == "send": 755 return [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)] 756 elif mode == "recv": 757 return [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)] 758 else: 759 return [] 760 761 762def execute_commands(commands): 763 processes = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in commands] 764 while processes: 765 for p in processes: 766 if not handle_process(p, processes, assert_out="FileTransfer finish"): 767 return False 768 return True 769 770 771def handle_process(p, processes, assert_out="FileTransfer finish"): 772 if p.poll() is not None: 773 stdout, stderr = p.communicate(timeout=512) # timeout wait 512s 774 if stderr: 775 print(f"{stderr.decode()}") 776 if stdout: 777 print(f"{stdout.decode()}") 778 if assert_out is not None and stdout.decode().find(assert_out) == -1: 779 return False 780 processes.remove(p) 781 return True 782 783 784def check_files(local, remote, mode, num): 785 res = 1 786 for i in range(num): 787 if mode == "send": 788 if _check_file(local, f"{remote}_{i}"): 789 res *= 1 790 else: 791 res *= 0 792 elif mode == "recv": 793 if _check_file(f"{local}_{i}", f"{remote}_{i}"): 794 res *= 1 795 else: 796 res *= 0 797 return res == 1 798 799 800def check_dirs(local, remote, mode, num): 801 res = 1 802 for _ in range(num): 803 if mode == "send": 804 end_of_file_name = os.path.basename(local) 805 if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True): 806 res *= 1 807 else: 808 res *= 0 809 elif mode == "recv": 810 end_of_file_name = os.path.basename(remote) 811 local = os.path.join(local, end_of_file_name) 812 if check_dir(f"{local}", f"{remote}", is_single_dir=True): 813 res *= 1 814 else: 815 res *= 0 816 return res == 1 817 818 819def make_multiprocess_file(local, remote, mode, num, task_type): 820 if num < 1: 821 return False 822 823 if task_type == "file": 824 commands = create_file_commands(local, remote, mode, num) 825 elif task_type == "dir": 826 commands = create_dir_commands(local, remote, mode, num) 827 else: 828 return False 829 830 print(commands[0]) 831 if not execute_commands(commands): 832 return False 833 834 if task_type == "file": 835 return check_files(local, remote, mode, num) 836 elif task_type == "dir": 837 return check_dirs(local, remote, mode, num) 838 else: 839 return False 840 841 842def hdc_get_key(cmd): 843 test_cmd = f"{GP.hdc_head} {cmd}" 844 result = subprocess.check_output(test_cmd.split()).decode() 845 return result 846 847 848def check_hdc_version(cmd, version): 849 850 def _convert_version_to_hex(_version): 851 parts = _version.split("Ver: ")[1].split('.') 852 hex_version = ''.join(parts) 853 return int(hex_version, 16) 854 855 expected_version = _convert_version_to_hex(version) 856 cmd = f"{GP.hdc_head} -v" 857 print(f"\nexecuting command: {cmd}") 858 if version is not None: # check output valid 859 output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "") 860 real_version = _convert_version_to_hex(output) 861 print(f"--> output: {output}") 862 print(f"--> your local [{version}] is" 863 f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]" 864 ) 865 return expected_version <= real_version 866 867 868def check_cmd_time(cmd, pattern, duration, times): 869 if times < 1: 870 print("times should be bigger than 0.") 871 return False 872 if pattern is None: 873 fetchable = True 874 else: 875 fetchable = False 876 start_time = time.time() * 1000 877 print(f"{cmd} start {start_time}") 878 res = [] 879 for i in range(times): 880 start_in = time.time() * 1000 881 if pattern is None: 882 subprocess.check_output(f"{GP.hdc_head} {cmd}".split()) 883 elif not check_shell(cmd, pattern, fetch=fetchable): 884 return False 885 start_out = time.time() * 1000 886 res.append(start_out - start_in) 887 888 # 计算最大值、最小值和中位数 889 max_value = max(res) 890 min_value = min(res) 891 median_value = sorted(res)[len(res) // 2] 892 893 print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}") 894 print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}") 895 print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}") 896 897 end_time = time.time() * 1000 898 899 try: 900 timecost = int(end_time - start_time) / times 901 print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}") 902 except ZeroDivisionError: 903 print(f"除数为0") 904 905 if duration is None: 906 duration = 150 * 1.2 907 # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status 908 return timecost < duration 909 910 911def check_rom(baseline): 912 913 def _try_get_size(message): 914 try: 915 size = int(message.split('\t')[0]) 916 except ValueError: 917 size = -9999 * 1024 # error size 918 print(f"try get size value error, from {message}") 919 return size 920 921 if baseline is None: 922 baseline = 2200 923 # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together 924 cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd" 925 result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode() 926 hdcd_size = _try_get_size(result_hdcd) 927 cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so" 928 result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode() 929 if "directory" in result_libhdc: 930 cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so" 931 result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode() 932 if "directory" in result_libhdc64: 933 libhdc_size = 0 934 else: 935 libhdc_size = _try_get_size(result_libhdc64) 936 else: 937 libhdc_size = _try_get_size(result_libhdc) 938 all_size = hdcd_size + libhdc_size 939 GP.hdcd_rom = all_size 940 if all_size < 0: 941 GP.hdcd_rom = "error" 942 return False 943 else: 944 GP.hdcd_rom = f"{all_size} KB" 945 if all_size > baseline: 946 print(f"rom size is {all_size}, overlimit baseline {baseline}") 947 return False 948 else: 949 print(f"rom size is {all_size}, underlimit baseline {baseline}") 950 return True 951 952 953def run_command_with_timeout(command, timeout): 954 try: 955 result = subprocess.run(command.split(), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) 956 return result.stdout.decode(), result.stderr.decode() 957 except subprocess.TimeoutExpired: 958 return "", "Command timed out" 959 except subprocess.CalledProcessError as e: 960 return "", e.stderr.decode() 961 962 963def check_cmd_block(command, pattern, timeout=600): 964 # 启动子进程 965 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 966 967 # 用于存储子进程的输出 968 output = "" 969 970 try: 971 # 读取子进程的输出 972 output, _ = process.communicate(timeout=timeout) 973 except subprocess.TimeoutExpired: 974 process.terminate() 975 process.kill() 976 output, _ = process.communicate(timeout=timeout) 977 978 print(f"--> output: {output}") 979 if pattern in output: 980 return True 981 else: 982 return False 983 984 985def check_version(version): 986 def decorator(func): 987 @functools.wraps(func) 988 def wrapper(*args, **kwargs): 989 if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version): 990 print("version does not match, ignore this case") 991 pytest.skip("Version does not match, test skipped.") 992 return func(*args, **kwargs) 993 return wrapper 994 return decorator 995 996 997@pytest.fixture(scope='class', autouse=True) 998def load_gp(request): 999 GP.load() 1000 1001 1002class ConfigFileNotFoundException(Exception): 1003 """配置文件未找到异常""" 1004 pass