1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2021 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# 0. 运行环境: python 3.10+, pytest 17# 1. 修改 GP 中字段 18# 2. pytest [-k case_name_pattern] 19# eg. pytest -k file 执行方法名含 file 的用例 20 21# 打包 22# pip install pyinstaller 23# prepare assert source dir includes your data files 24# pyi-makespec -D --add-data assert:assert dev_hdc_test.py 25# pyinstaller dev_hdc_test.spec 26# 执行 dev_hdc_test.exe 27 28import csv 29import hashlib 30import json 31import logging 32import os 33import random 34import re 35import stat 36import shutil 37import subprocess 38import sys 39import threading 40import time 41from multiprocessing import Process 42 43import pytest 44import pkg_resources 45 46 47class GP(object): 48 """ Global Parameters 49 50 customize here !!! 51 """ 52 hdc_exe = "hdc" 53 local_path = "resource" 54 remote_path = "data/local/tmp" 55 remote_ip = "auto" 56 remote_port = 8710 57 hdc_head = "hdc" 58 device_name = "" 59 targets = [] 60 tmode = "usb" 61 changed_testcase = "n" 62 testcase_path = "ts_windows.csv" 63 loaded_testcase = 0 64 65 @classmethod 66 def init(cls): 67 logging.basicConfig(level=logging.INFO, 68 format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s', 69 datefmt='%d %b %Y %H:%M:%S', 70 ) 71 logging.basicConfig(level=logging.WARNING, 72 format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s', 73 datefmt='%d %b %Y %H:%M:%S', 74 ) 75 76 if os.path.exists(".hdctester.conf"): 77 cls.load() 78 cls.start_host() 79 cls.list_targets() 80 else: 81 cls.set_options() 82 cls.print_options() 83 cls.start_host() 84 cls.dump() 85 return 86 87 88 @classmethod 89 def start_host(cls): 90 cmd = f"{cls.hdc_exe} start" 91 res = subprocess.call(cmd.split()) 92 return res 93 94 95 @classmethod 96 def list_targets(cls): 97 try: 98 targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split() 99 except (OSError, IndexError): 100 targets = [b"failed to auto detect device"] 101 cls.targets = [targets[0].decode()] 102 return False 103 cls.targets = [t.decode() for t in targets] 104 return True 105 106 107 @classmethod 108 def get_device(cls): 109 cls.start_host() 110 cls.list_targets() 111 if len(cls.targets) > 1: 112 print("Multiple device detected, please select one:") 113 for i, t in enumerate(cls.targets): 114 print(f"{i+1}. {t}") 115 print("input the nums of the device above:") 116 cls.device_name = cls.targets[int(input()) - 1] 117 else: 118 cls.device_name = cls.targets[0] 119 if cls.device_name == "failed to auto detect device": 120 print("No device detected, please check your device connection") 121 return False 122 elif cls.device_name == "[empty]": 123 print("No hdc device detected.") 124 return False 125 cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}" 126 return True 127 128 129 @classmethod 130 def dump(cls): 131 try: 132 os.remove(".hdctester.conf") 133 except OSError: 134 pass 135 content = filter( 136 lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items()) 137 json_str = json.dumps(dict(content)) 138 fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755) 139 os.write(fd, json_str.encode()) 140 os.close(fd) 141 return True 142 143 144 @classmethod 145 def load(cls): 146 with open(".hdctester.conf") as f: 147 content = json.load(f) 148 cls.hdc_exe = content.get("hdc_exe") 149 cls.local_path = content.get("local_path") 150 cls.remote_path = content.get("remote_path") 151 cls.remote_ip = content.get("remote_ip") 152 cls.hdc_head = content.get("hdc_head") 153 cls.tmode = content.get("tmode") 154 cls.device_name = content.get("device_name") 155 cls.changed_testcase = content.get("changed_testcase") 156 cls.testcase_path = content.get("testcase_path") 157 cls.loaded_testcase = content.get("load_testcase") 158 return True 159 160 161 @classmethod 162 def print_options(cls): 163 info = "HDC Tester Default Options: \n\n" \ 164 + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \ 165 + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \ 166 + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \ 167 + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \ 168 + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \ 169 + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \ 170 + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \ 171 + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \ 172 + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \ 173 + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \ 174 + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n" 175 print(info) 176 177 178 @classmethod 179 def tconn_tcp(cls): 180 res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode() 181 if "Connect OK" in res: 182 return True 183 else: 184 return False 185 186 187 @classmethod 188 def set_options(cls): 189 if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip(): 190 cls.hdc_exe = opt 191 if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip(): 192 cls.local_path = opt 193 if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip(): 194 cls.remote_path = opt 195 if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip(): 196 cls.remote_ip = opt 197 if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip(): 198 cls.remote_port = int(opt) 199 if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip(): 200 cls.device_name = opt 201 if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip(): 202 cls.tmode = opt 203 if cls.tmode == "usb": 204 ret = cls.get_device() 205 if ret: 206 print("USB device detected.") 207 elif cls.tconn_tcp(): 208 cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}" 209 else: 210 print(f"tconn {cls.remote_ip}:{cls.remote_port} failed") 211 return False 212 return True 213 214 215 @classmethod 216 def change_testcase(cls): 217 if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip(): 218 cls.changed_testcase = opt 219 if opt == "n": 220 return False 221 if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip(): 222 cls.testcase_path = os.path.join(opt) 223 cls.print_options() 224 return True 225 226 227 @classmethod 228 def load_testcase(cls): 229 print("this fuction will coming soon.") 230 return False 231 232 @classmethod 233 def get_version(cls): 234 version = f"v1.0.4a" 235 return version 236 237 238def pytest_run(args): 239 file_list = [] 240 file_list.append("entry-default-signed-debug.hap") 241 file_list.append("libA_v10001.hsp") 242 file_list.append("libB_v10001.hsp") 243 for file in file_list: 244 if not os.path.exists(os.path.join(GP.local_path, file)): 245 print(f"No {file} File!") 246 print("请将package.zip中的安装包文件解压到当前脚本resource目录中,操作完成该步骤后重新执行脚本。") 247 print("Please unzip package.zip to resource directory, please rerun after operation.") 248 input("[ENTER]") 249 return 250 gen_package_dir() 251 start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 252 if args.count is not None: 253 for i in range(args.count): 254 print(f"------------The {i}/{args.count} Test-------------") 255 timestamp = time.time() 256 pytest_args = [ 257 '--verbose', args.verbose, 258 '--report=report.html', 259 '--title=HDC Test Report 'f"{GP.get_version()}", 260 '--tester=tester001', 261 '--template=1', 262 '--desc='f"{args.verbose}:{args.desc}" 263 ] 264 pytest.main(pytest_args) 265 end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 266 report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time())) 267 report_dir = os.path.join(os.getcwd(), "reports") 268 report_file = os.path.join(report_dir, f"{report_time}report.html") 269 print(f"Test over, the script version is {GP.get_version()}," 270 f" start at {start_time}, end at {end_time} \n" 271 f"=======>{report_file} is saved. \n" 272 ) 273 input("=======>press [Enter] key to Show logs.") 274 275 276def rmdir(path): 277 try: 278 if sys.platform == "win32": 279 if os.path.isfile(path): 280 os.remove(path) 281 else: 282 shutil.rmtree(path) 283 else: 284 subprocess.call(f"rm -rf {path}".split()) 285 except OSError: 286 print(f"Error: {path} : cannot remove") 287 pass 288 289 290def get_local_path(path): 291 return os.path.join(GP.local_path, path) 292 293 294def get_remote_path(path): 295 return f"{GP.remote_path}/{path}" 296 297 298def get_local_md5(local): 299 md5_hash = hashlib.md5() 300 with open(local, "rb") as f: 301 for byte_block in iter(lambda: f.read(4096), b""): 302 md5_hash.update(byte_block) 303 return md5_hash.hexdigest() 304 305 306def check_shell(cmd, pattern=None, fetch=False): 307 cmd = f"{GP.hdc_head} {cmd}" 308 print(f"\nexecuting command: {cmd}") 309 if pattern: # check output valid 310 output = subprocess.check_output(cmd.split()).decode() 311 res = pattern in output 312 print(f"--> output: {output}") 313 print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") 314 return res 315 elif fetch: 316 output = subprocess.check_output(cmd.split()).decode() 317 print(f"--> output: {output}") 318 return output 319 else: # check cmd run successfully 320 return subprocess.check_call(cmd.split()) == 0 321 322 323def get_shell_result(cmd, pattern=None, fetch=False): 324 cmd = f"{GP.hdc_head} {cmd}" 325 print(f"\nexecuting command: {cmd}") 326 return subprocess.check_output(cmd.split()).decode() 327 328 329def check_rate(cmd, expected_rate): 330 send_result = get_shell_result(cmd) 331 rate = float(send_result.split("rate:")[1].split("kB/s")[0]) 332 return rate > expected_rate 333 334 335def check_dir(local, remote, is_single_dir=False): 336 def _get_md5sum(remote, is_single_dir=False): 337 if is_single_dir: 338 cmd = f"{GP.hdc_head} shell md5sum {remote}/*" 339 else: 340 cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;' 341 result = subprocess.check_output(cmd.split()).decode() 342 return result 343 344 def _calculate_md5(file_path): 345 md5 = hashlib.md5() 346 try: 347 with open(file_path, 'rb') as f: 348 for chunk in iter(lambda: f.read(4096), b""): 349 md5.update(chunk) 350 return md5.hexdigest() 351 except PermissionError: 352 return "PermissionError" 353 except FileNotFoundError: 354 return "FileNotFoundError" 355 print("remote:" + remote) 356 output = _get_md5sum(remote) 357 print(output) 358 359 result = 1 360 for line in output.splitlines(): 361 if len(line) < 32: # length of MD5 362 continue 363 expected_md5, file_name = line.split()[:2] 364 if is_single_dir: 365 file_name = file_name.replace(f"{remote}", "") 366 elif GP.remote_path in remote: 367 file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\") 368 else: 369 file_name = file_name.split(remote)[1].replace("/", "\\") 370 file_path = os.path.join(os.getcwd(), GP.local_path) + file_name # 构建完整的文件路径 371 if is_single_dir: 372 file_path = os.path.join(os.getcwd(), local) + file_name 373 print(file_path) 374 actual_md5 = _calculate_md5(file_path) 375 print(f"Expected: {expected_md5}") 376 print(f"Actual: {actual_md5}") 377 print(f"MD5 matched {file_name}") 378 if actual_md5 != expected_md5: 379 print(f"[Fail]MD5 mismatch for {file_name}") 380 result *= 0 381 382 return (result == 1) 383 384 385def _check_file(local, remote): 386 if remote.startswith("/proc"): 387 local_size = os.path.getsize(local) 388 if local_size > 0: 389 return True 390 else: 391 return False 392 else: 393 cmd = f"shell md5sum {remote}" 394 local_md5 = get_local_md5(local) 395 return check_shell(cmd, local_md5) 396 397 398def _check_app_installed(bundle, is_shared=False): 399 dump = "dump-shared" if is_shared else "dump" 400 cmd = f"shell bm {dump} -a" 401 return check_shell(cmd, bundle) 402 403 404def check_hdc_targets(): 405 cmd = f"{GP.hdc_head} list targets" 406 print(GP.device_name) 407 return check_shell(cmd, GP.device_name) 408 409 410def check_file_send(local, remote): 411 local_path = os.path.join(GP.local_path, local) 412 remote_path = f"{GP.remote_path}/{remote}" 413 cmd = f"file send {local_path} {remote_path}" 414 return check_shell(cmd) and _check_file(local_path, remote_path) 415 416 417def check_file_recv(remote, local): 418 local_path = os.path.join(GP.local_path, local) 419 remote_path = f"{GP.remote_path}/{remote}" 420 cmd = f"file recv {remote_path} {local_path}" 421 return check_shell(cmd) and _check_file(local_path, remote_path) 422 423 424def check_app_install(app, bundle, args=""): 425 app = os.path.join(GP.local_path, app) 426 install_cmd = f"install {args} {app}" 427 if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): 428 return check_shell(install_cmd, "failed to install bundle") 429 else: 430 return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args) 431 432 433def check_app_uninstall(bundle, args=""): 434 uninstall_cmd = f"uninstall {args} {bundle}" 435 return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args) 436 437 438def check_app_install_multi(tables, args=""): 439 apps = [] 440 bundles = [] 441 for app, bundle in tables.items() : 442 app = os.path.join(GP.local_path, app) 443 apps.append(app) 444 bundles.append(bundle) 445 446 apps_str = " ".join(apps) 447 install_cmd = f"install {args} {apps_str}" 448 449 if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str)) 450 or (args == "" and 0 == apps_str.count(".hap"))): 451 if not check_shell(install_cmd, "failed to install bundle"): 452 return False 453 else: 454 if not check_shell(install_cmd, "successfully"): 455 return False 456 457 for bundle in bundles: 458 if not _check_app_installed(bundle, "s" in args): 459 return False 460 461 return True 462 463 464def check_app_uninstall_multi(tables, args=""): 465 for app, bundle in tables.items() : 466 if not check_app_uninstall(bundle, args): 467 return False 468 469 return True 470 471 472def check_hdc_cmd(cmd, pattern=None, **args): 473 if cmd.startswith("file"): 474 if not check_shell(cmd, "FileTransfer finish"): 475 return False 476 if cmd.startswith("file send"): 477 local, remote = cmd.split()[-2:] 478 else: 479 remote, local = cmd.split()[-2:] 480 if os.path.isfile(local): 481 return _check_file(local, remote) 482 else: 483 return check_dir(local, remote) 484 elif cmd.startswith("install"): 485 bundle = args.get("bundle", "invalid") 486 opt = " ".join(cmd.split()[1:-1]) 487 return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt) 488 489 elif cmd.startswith("uninstall"): 490 bundle = cmd.split()[-1] 491 opt = " ".join(cmd.split()[1:-1]) 492 return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt) 493 494 else: 495 return check_shell(cmd, pattern, **args) 496 497 498def check_soft_local(local_source, local_soft, remote): 499 cmd = f"file send {local_soft} {remote}" 500 if not check_shell(cmd, "FileTransfer finish"): 501 return False 502 return _check_file(local_source, remote) 503 504 505def check_soft_remote(remote_source, remote_soft, local_recv): 506 check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}") 507 cmd = f"file recv {remote_soft} {local_recv}" 508 if not check_shell(cmd, "FileTransfer finish"): 509 return False 510 return _check_file(local_recv, get_remote_path(remote_source)) 511 512 513def switch_usb(): 514 res = check_hdc_cmd("tmode usb") 515 time.sleep(3) 516 if res: 517 GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}" 518 return res 519 520 521def copy_file(src, dst): 522 try: 523 shutil.copy2(src, dst) 524 print(f"File copied successfully from {src} to {dst}") 525 except IOError as e: 526 print(f"Unable to copy file. {e}") 527 except Exception as e: 528 print(f"Unexpected error: {e}") 529 530 531def switch_tcp(): 532 if not GP.remote_ip: # skip tcp check 533 print("!!! remote_ip is none, skip tcp check !!!") 534 return True 535 if GP.remote_ip == "auto": 536 ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True) 537 if not ipconf: 538 print("!!! device ip not found, skip tcp check !!!") 539 return True 540 GP.remote_ip = ipconf.split(":")[1].split()[0] 541 print(f"fetch remote ip: {GP.remote_ip}") 542 ret = check_hdc_cmd(f"tmode port {GP.remote_port}") 543 if ret: 544 time.sleep(3) 545 res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK") 546 if res: 547 GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}" 548 return res 549 550 551def select_cmd(): 552 msg = "1) Proceed tester\n" \ 553 + "2) Customize tester\n" \ 554 + "3) Setup files for transfer\n" \ 555 + "4) Load custom testcase(default unused) \n" \ 556 + "5) Exit\n" \ 557 + ">> " 558 559 while True: 560 opt = input(msg).strip() 561 if len(opt) == 1 and '1' <= opt <= '5': 562 return opt 563 564 565def gen_file(path, size): 566 index = 0 567 path = os.path.abspath(path) 568 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 569 570 while index < size: 571 os.write(fd, os.urandom(1024)) 572 index += 1024 573 os.close(fd) 574 575 576def gen_zero_file(path, size): 577 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 578 os.write(fd, b'0' * size) 579 os.close(fd) 580 581 582def create_file_with_size(path, size): 583 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 584 os.write(fd, b'\0' * size) 585 os.close(fd) 586 587 588def gen_file_set(): 589 print("generating empty file ...") 590 gen_file(os.path.join(GP.local_path, "empty"), 0) 591 592 print("generating small file ...") 593 gen_file(os.path.join(GP.local_path, "small"), 102400) 594 595 print("generating medium file ...") 596 gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2) 597 598 print("generating large file ...") 599 gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3) 600 601 print("generating soft link ...") 602 try: 603 os.symlink("small", os.path.join(GP.local_path, "soft_small")) 604 except FileExistsError: 605 print("soft_small already exists") 606 607 print("generating text file ...") 608 gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2) 609 610 print("generating package dir ...") 611 if not os.path.exists(os.path.join(GP.local_path, "package")): 612 os.mkdir(os.path.join(GP.local_path, "package")) 613 for i in range(1, 6): 614 gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2) 615 616 print("generating deep dir ...") 617 deepth = 4 618 deep_path = os.path.join(GP.local_path, "deep_dir") 619 if not os.path.exists(deep_path): 620 os.mkdir(deep_path) 621 for deep in range(deepth): 622 deep_path = os.path.join(deep_path, f"deep_dir{deep}") 623 if not os.path.exists(deep_path): 624 os.mkdir(deep_path) 625 gen_file(os.path.join(deep_path, "deep"), 102400) 626 627 print("generating dir with file ...") 628 dir_path = os.path.join(GP.local_path, "problem_dir") 629 rmdir(dir_path) 630 os.mkdir(dir_path) 631 gen_file(os.path.join(dir_path, "small2"), 102400) 632 633 fuzz_count = 47 # 47 is the count that circulated the file transfer 634 data_unit = 1024 # 1024 is the size that circulated the file transfer 635 data_extra = 936 # 936 is the size that cased the extra file transfer 636 for i in range(fuzz_count): 637 create_file_with_size( 638 os.path.join( 639 dir_path, f"file_{i * data_unit+data_extra}.dat" 640 ), i * data_unit + data_extra 641 ) 642 643 print("generating empty dir ...") 644 dir_path = os.path.join(GP.local_path, "empty_dir") 645 rmdir(dir_path) 646 os.mkdir(dir_path) 647 648 print("generating version file ...") 649 gen_file(os.path.join(GP.local_path, GP.get_version()), 0) 650 651 652def prepare_source(): 653 if os.path.exists(os.path.join(GP.local_path, GP.get_version())): 654 print(f"hdc test version is {GP.get_version}, check ok, skip prepare.") 655 return 656 print(f"in prepare {GP.local_path},wait for 2 mins.") 657 current_path = os.getcwd() 658 659 if os.path.exists(GP.local_path): 660 # 打开local_path遍历其中的文件,删除hap hsp以外的所有文件 661 for file in os.listdir(GP.local_path): 662 if file.endswith(".hap") or file.endswith(".hsp"): 663 continue 664 file_path = os.path.join(GP.local_path, file) 665 rmdir(file_path) 666 else: 667 os.mkdir(GP.local_path) 668 669 gen_file_set() 670 671 672def gen_package_dir(): 673 print("generating app dir ...") 674 dir_path = os.path.join(GP.local_path, "app_dir") 675 rmdir(dir_path) 676 os.mkdir(dir_path) 677 app = os.path.join(GP.local_path, "entry-default-signed-debug.hap") 678 dst_dir = os.path.join(GP.local_path, "app_dir") 679 if not os.path.exists(app): 680 print(f"Source file {app} does not exist.") 681 else: 682 copy_file(app, dst_dir) 683 684 685def setup_tester(): 686 while True: 687 GP.print_options() 688 opt = int(select_cmd()) 689 if opt == 1: 690 return True 691 elif opt == 2: 692 if not GP.set_options(): 693 return False 694 GP.dump() 695 elif opt == 3: 696 prepare_source() 697 elif opt == 4: 698 if not GP.load_testcase(): 699 return False 700 elif opt == 5: 701 return False 702 else: 703 return False 704 705 706def load_testcase(): 707 if not GP.load_testcase: 708 print("load testcase failed") 709 return False 710 print("load testcase success") 711 return True 712 713 714def check_library_installation(library_name): 715 try: 716 pkg_resources.get_distribution(library_name) 717 return 0 718 except pkg_resources.DistributionNotFound: 719 print(f"\n\n{library_name} is not installed.\n\n") 720 print(f"try to use command below:") 721 print(f"pip install {library_name}") 722 return 1 723 724 725def check_subprocess_cmd(cmd, process_num, timeout): 726 727 for i in range(process_num): 728 p = subprocess.Popen(cmd.split()) 729 try: 730 p.wait(timeout=5) 731 except subprocess.TimeoutExpired: 732 p.kill() 733 734 735def check_process_mixed(process_num, timeout, local, remote): 736 multi_num = process_num 737 list_send = [] 738 list_recv = [] 739 sizes = {"small", "medium", "empty"} 740 for i in range(multi_num): 741 for size in sizes: 742 cmd_send = f"file send {get_local_path(f'{size}')} {get_remote_path(f'it_{size}_mix_{i}')}" 743 cmd_recv = f"file recv {get_remote_path(f'it_{size}_mix_{i}')} {get_local_path(f'recv_{size}_mix_{i}')}" 744 list_send.append(Process(target=check_hdc_cmd, args=(cmd_send, ))) 745 list_recv.append(Process(target=check_hdc_cmd, args=(cmd_recv, ))) 746 logging.info(f"RESULT:{cmd_send}") # 打印命令的输出 747 for send in list_send: 748 wait_time = random.uniform(0, 1) 749 send.start() 750 time.sleep(wait_time) 751 for send in list_send: 752 send.join() 753 754 for recv in list_recv: 755 wait_time = random.uniform(0, 1) 756 recv.start() 757 time.sleep(wait_time) 758 for recv in list_recv: 759 recv.join() 760 wait_time = random.uniform(0, 1) 761 time.sleep(wait_time) 762 763 764def execute_lines_in_file(file_path): 765 if not os.path.exists(file_path): 766 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 767 modes = stat.S_IWUSR | stat.S_IRUSR 768 with os.fdopen(os.open(file_path, flags, modes), 'w') as file: 769 file.write("1,hdc shell ls") 770 with open(file_path, 'r') as file: 771 lines = file.readlines() 772 for line in lines: 773 test_time = line.split(',')[0] 774 test_cmd = line.split(',')[1] 775 pattern = r"^hdc" 776 match = re.search(pattern, test_cmd) 777 if match: 778 result = test_cmd.replace(match.group(0), "").lstrip() 779 test_cmd = f"{GP.hdc_head} {result}" 780 781 for i in range(int(test_time)): 782 logging.info(f"THE {i+1}/{test_time} TEST,COMMAND IS:{test_cmd}") 783 output = subprocess.check_output(test_cmd.split()).decode() 784 logging.info(f"RESULT:{output}") # 打印命令的输出 785 786 787def make_multiprocess_file(local, remote, mode, num, task_type): 788 if num < 1: 789 return False 790 if task_type == "file": 791 if mode == "send" : 792 file_list = [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)] 793 elif mode == "recv": 794 file_list = [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)] 795 else: 796 return False 797 if task_type == "dir": 798 if mode == "send" : 799 file_list = [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)] 800 elif mode == "recv": 801 file_list = [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)] 802 else: 803 return False 804 print(file_list[0]) 805 p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in file_list] 806 print(f"{mode} target {num} start") 807 while(len(p_list)): 808 for p in p_list: 809 if p.poll() is not None: 810 stdout, stderr = p.communicate(timeout=512) # timeout wait 512s 811 if stderr: 812 print(f"{stderr.decode()}") 813 if stdout: 814 print(f"{stdout.decode()}") 815 if stdout.decode().find("FileTransfer finish") == -1: 816 return False 817 p_list.remove(p) 818 res = 1 819 if task_type == "file": 820 for i in range(num): 821 if mode == "send": 822 if _check_file(local, f"{remote}_{i}"): 823 res *= 1 824 else: 825 res *= 0 826 elif mode == "recv": 827 if _check_file(f"{local}_{i}", f"{remote}_{i}"): 828 res *= 1 829 else: 830 res *= 0 831 if task_type == "dir": 832 for _ in range(num): 833 if mode == "send": 834 end_of_file_name = os.path.basename(local) 835 if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True): 836 res *= 1 837 else: 838 res *= 0 839 elif mode == "recv": 840 end_of_file_name = os.path.basename(remote) 841 local = os.path.join(local, end_of_file_name) 842 if check_dir(f"{local}", f"{remote}", is_single_dir=True): 843 res *= 1 844 else: 845 res *= 0 846 return res == 1 847 848 849def hdc_get_key(cmd): 850 test_cmd = f"{GP.hdc_head} {cmd}" 851 result = subprocess.check_output(test_cmd.split()).decode() 852 return result 853 854 855def start_subprocess_cmd(cmd, num, assert_out): 856 if num < 1: 857 return False 858 cmd_list = [f"{GP.hdc_head} {cmd}" for _ in range(num)] 859 p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmd_list] 860 logging.info(f"{cmd} target {num} start") 861 while(len(p_list)): 862 for p in p_list: 863 if p.poll() is not None: 864 stdout, stderr = p.communicate(timeout=512) 865 if stderr: 866 logging.error(f"{stderr.decode()}") 867 if stdout: 868 logging.info(f"{stdout.decode()}") 869 if assert_out is not None and stdout.decode().find(assert_out) == -1: 870 return False 871 p_list.remove(p) 872 return True 873 874 875def check_hdc_version(cmd, version): 876 877 def _convert_version_to_hex(_version): 878 parts = _version.split("Ver: ")[1].split('.') 879 hex_version = ''.join(parts) 880 return int(hex_version, 16) 881 882 expected_version = _convert_version_to_hex(version) 883 cmd = f"{GP.hdc_head} -v" 884 print(f"\nexecuting command: {cmd}") 885 if version is not None: # check output valid 886 output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "") 887 real_version = _convert_version_to_hex(output) 888 print(f"--> output: {output}") 889 print(f"--> your local [{version}] is" 890 f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]" 891 ) 892 return expected_version <= real_version 893 894 895def check_cmd_time(cmd, pattern, duration, times): 896 if times < 1: 897 print("times should be bigger than 0.") 898 return False 899 if pattern is None: 900 fetchable = True 901 else: 902 fetchable = False 903 start_time = time.time() * 1000 904 print(f"{cmd} start {start_time}") 905 res = [] 906 for i in range(times): 907 start_in = time.time() * 1000 908 if pattern is None: 909 subprocess.check_output(f"{GP.hdc_head} {cmd}".split()) 910 else: 911 check_shell(cmd, pattern, fetch=fetchable) 912 start_out = time.time() * 1000 913 res.append(start_out - start_in) 914 915 # 计算最大值、最小值和中位数 916 max_value = max(res) 917 min_value = min(res) 918 median_value = sorted(res)[len(res) // 2] 919 920 print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}") 921 print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}") 922 print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}") 923 924 end_time = time.time() * 1000 925 926 try: 927 timecost = int(end_time - start_time) / times 928 print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}") 929 except ZeroDivisionError: 930 print(f"除数为0") 931 932 if duration is None: 933 duration = 150 * 1.2 934 # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status 935 return timecost < duration 936 937 938def check_rom(baseline): 939 940 def _try_get_size(message): 941 try: 942 size = int(message.split('\t')[0]) 943 except ValueError: 944 size = -9999 * 1024 # error size 945 print(f"try get size value error, from {message}") 946 return size 947 948 if baseline is None: 949 baseline = 2200 950 # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together 951 cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd" 952 result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode() 953 hdcd_size = _try_get_size(result_hdcd) 954 cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so" 955 result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode() 956 if "directory" in result_libhdc: 957 cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so" 958 result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode() 959 if "directory" in result_libhdc64: 960 libhdc_size = 0 961 else: 962 libhdc_size = _try_get_size(result_libhdc64) 963 else: 964 libhdc_size = _try_get_size(result_libhdc) 965 all_size = hdcd_size + libhdc_size 966 GP.hdcd_rom = all_size 967 if all_size < 0: 968 GP.hdcd_rom = "error" 969 return False 970 else: 971 GP.hdcd_rom = f"{all_size} KB" 972 if all_size > baseline: 973 print(f"rom size is {all_size}, overlimit baseline {baseline}") 974 return False 975 else: 976 print(f"rom size is {all_size}, underlimit baseline {baseline}") 977 return True 978 979 980def run_command_with_timeout(command, timeout): 981 try: 982 result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) 983 return result.stdout.decode(), result.stderr.decode() 984 except subprocess.TimeoutExpired: 985 return None, "Command timed out" 986 except subprocess.CalledProcessError as e: 987 return None, e.stderr.decode() 988