1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2025 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 运行环境: python 3.10+, pytest 16 17import hashlib 18import json 19import os 20import re 21import shutil 22import subprocess 23import sys 24import time 25import tempfile 26import functools 27import logging 28import socket 29import platform 30import pytest 31import importlib 32 33logger = logging.getLogger(__name__) 34 35 36class GP(object): 37 """ Global Parameters 38 39 customize here !!! 40 """ 41 hdc_exe = "hdc" 42 local_path = "resource" 43 remote_path = "data/local/tmp" 44 remote_dir_path = "data/local/tmp/it_send_dir" 45 remote_ip = "auto" 46 remote_port = 8710 47 hdc_head = "hdc" 48 device_name = "" 49 targets = [] 50 tmode = "usb" 51 changed_testcase = "n" 52 testcase_path = "ts_windows.csv" 53 loaded_testcase = 0 54 hdcd_rom = "not checked" 55 debug_app = "com.example.myapplication" 56 57 @classmethod 58 def init(cls): 59 if os.path.exists(".hdctester.conf"): 60 cls.load() 61 cls.start_host() 62 cls.list_targets() 63 else: 64 cls.set_options() 65 cls.print_options() 66 cls.start_host() 67 cls.dump() 68 return 69 70 71 @classmethod 72 def start_host(cls): 73 cmd = f"{cls.hdc_exe} start" 74 res = subprocess.call(cmd.split()) 75 return res 76 77 @classmethod 78 def list_targets(cls): 79 try: 80 targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split() 81 except (OSError, IndexError): 82 targets = [b"failed to auto detect device"] 83 cls.targets = [targets[0].decode()] 84 return False 85 cls.targets = [t.decode() for t in targets] 86 return True 87 88 89 @classmethod 90 def get_device(cls): 91 cls.start_host() 92 cls.list_targets() 93 if len(cls.targets) > 1: 94 print("Multiple device detected, please select one:") 95 for i, t in enumerate(cls.targets): 96 print(f"{i+1}. {t}") 97 print("input the nums of the device above:") 98 cls.device_name = cls.targets[int(input()) - 1] 99 else: 100 cls.device_name = cls.targets[0] 101 if cls.device_name == "failed to auto detect device": 102 print("No device detected, please check your device connection") 103 return False 104 elif cls.device_name == "[empty]": 105 print("No hdc device detected.") 106 return False 107 cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}" 108 return True 109 110 111 @classmethod 112 def dump(cls): 113 try: 114 os.remove(".hdctester.conf") 115 except OSError: 116 pass 117 content = filter( 118 lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items()) 119 json_str = json.dumps(dict(content)) 120 fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755) 121 os.write(fd, json_str.encode()) 122 os.close(fd) 123 return True 124 125 126 @classmethod 127 def load(cls): 128 if not os.path.exists(".hdctester.conf"): 129 raise ConfigFileNotFoundException("No config file found, please run command [python prepare.py]") 130 with open(".hdctester.conf") as f: 131 content = json.load(f) 132 cls.hdc_exe = content.get("hdc_exe") 133 cls.local_path = content.get("local_path") 134 cls.remote_path = content.get("remote_path") 135 cls.remote_ip = content.get("remote_ip") 136 cls.hdc_head = content.get("hdc_head") 137 cls.tmode = content.get("tmode") 138 cls.device_name = content.get("device_name") 139 cls.changed_testcase = content.get("changed_testcase") 140 cls.testcase_path = content.get("testcase_path") 141 cls.loaded_testcase = content.get("load_testcase") 142 return True 143 144 145 @classmethod 146 def print_options(cls): 147 info = "HDC Tester Default Options: \n\n" \ 148 + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \ 149 + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \ 150 + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \ 151 + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \ 152 + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \ 153 + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \ 154 + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \ 155 + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \ 156 + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \ 157 + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \ 158 + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n" 159 print(info) 160 161 162 @classmethod 163 def tconn_tcp(cls): 164 res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode() 165 if "Connect OK" in res: 166 return True 167 else: 168 return False 169 170 171 @classmethod 172 def set_options(cls): 173 if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip(): 174 cls.hdc_exe = opt 175 if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip(): 176 cls.local_path = opt 177 if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip(): 178 cls.remote_path = opt 179 if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip(): 180 cls.remote_ip = opt 181 if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip(): 182 cls.remote_port = int(opt) 183 if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip(): 184 cls.device_name = opt 185 if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip(): 186 cls.tmode = opt 187 if cls.tmode == "usb": 188 ret = cls.get_device() 189 if ret: 190 print("USB device detected.") 191 elif cls.tconn_tcp(): 192 cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}" 193 else: 194 print(f"tconn {cls.remote_ip}:{cls.remote_port} failed") 195 return False 196 return True 197 198 199 @classmethod 200 def change_testcase(cls): 201 if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip(): 202 cls.changed_testcase = opt 203 if opt == "n": 204 return False 205 if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip(): 206 cls.testcase_path = os.path.join(opt) 207 cls.print_options() 208 return True 209 210 211 @classmethod 212 def load_testcase(cls): 213 print("this fuction will coming soon.") 214 return False 215 216 @classmethod 217 def get_version(cls): 218 version = f"v1.0.6a" 219 return version 220 221 222def pytest_run(): 223 start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 224 pytest.main() 225 end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 226 report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time())) 227 report_dir = os.path.join(os.getcwd(), "reports") 228 report_file = os.path.join(report_dir, f"{report_time}report.html") 229 print(f"Test over, the script version is {GP.get_version()}," 230 f" start at {start_time}, end at {end_time} \n" 231 f"=======>{report_file} is saved. \n" 232 ) 233 input("=======>press [Enter] key to Show logs.") 234 235 236def rmdir(path): 237 try: 238 if sys.platform == "win32": 239 if os.path.isfile(path) or os.path.islink(path): 240 os.remove(path) 241 else: 242 shutil.rmtree(path) 243 else: 244 subprocess.call(f"rm -rf {path}".split()) 245 except OSError: 246 print(f"Error: {path} : cannot remove") 247 pass 248 249 250def get_local_path(path): 251 return os.path.join(GP.local_path, path) 252 253 254def get_remote_path(path): 255 return f"{GP.remote_path}/{path}" 256 257 258def get_local_md5(local): 259 md5_hash = hashlib.md5() 260 with open(local, "rb") as f: 261 for byte_block in iter(lambda: f.read(4096), b""): 262 md5_hash.update(byte_block) 263 return md5_hash.hexdigest() 264 265 266def check_shell_any_device(cmd, pattern=None, fetch=False): 267 print(f"\nexecuting command: {cmd}") 268 if pattern: # check output valid 269 print("pattern case") 270 try: 271 output = subprocess.check_output(cmd.split()).decode('utf-8') 272 except UnicodeDecodeError: 273 output = subprocess.check_output(cmd.split()).decode('gbk') 274 res = pattern in output 275 print(f"--> output: {output}") 276 print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") 277 return res, output 278 elif fetch: 279 output = subprocess.check_output(cmd.split()).decode() 280 print(f"--> output: {output}") 281 return True, output 282 else: # check cmd run successfully 283 print("other case") 284 return subprocess.check_call(cmd.split()) == 0, "" 285 286 287def check_shell(cmd, pattern=None, fetch=False, head=None): 288 if head is None: 289 head = GP.hdc_head 290 cmd = f"{head} {cmd}" 291 print(f"\nexecuting command: {cmd}") 292 if pattern: # check output valid 293 output = subprocess.check_output(cmd.split()).decode() 294 res = pattern in output 295 print(f"--> output: {output}") 296 print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") 297 return res 298 elif fetch: 299 output = subprocess.check_output(cmd.split()).decode() 300 print(f"--> output: {output}") 301 return output 302 else: # check cmd run successfully 303 return subprocess.check_call(cmd.split()) == 0 304 305 306def get_shell_result(cmd, pattern=None, fetch=False): 307 cmd = f"{GP.hdc_head} {cmd}" 308 print(f"\nexecuting command: {cmd}") 309 return subprocess.check_output(cmd.split()).decode() 310 311 312def check_rate(cmd, expected_rate): 313 send_result = get_shell_result(cmd) 314 rate = float(send_result.split("rate:")[1].split("kB/s")[0]) 315 return rate > expected_rate 316 317 318def check_dir(local, remote, is_single_dir=False): 319 def _get_md5sum(remote, is_single_dir=False): 320 if is_single_dir: 321 cmd = f"{GP.hdc_head} shell md5sum {remote}/*" 322 else: 323 cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}}' 324 result = subprocess.check_output(cmd.split()).decode() 325 return result 326 327 def _calculate_md5(file_path): 328 md5 = hashlib.md5() 329 try: 330 with open(file_path, 'rb') as f: 331 for chunk in iter(lambda: f.read(4096), b""): 332 md5.update(chunk) 333 return md5.hexdigest() 334 except PermissionError: 335 return "PermissionError" 336 except FileNotFoundError: 337 return "FileNotFoundError" 338 print("remote:" + remote) 339 output = _get_md5sum(remote) 340 print(output) 341 342 result = 1 343 for line in output.splitlines(): 344 if len(line) < 32: # length of MD5 345 continue 346 expected_md5, file_name = line.split()[:2] 347 if is_single_dir: 348 file_name = file_name.replace(f"{remote}", "") 349 elif GP.remote_path in remote: 350 file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\") 351 else: 352 file_name = file_name.split(remote)[1].replace("/", "\\") 353 file_path = os.path.join(os.getcwd(), GP.local_path) + file_name # 构建完整的文件路径 354 if is_single_dir: 355 file_path = os.path.join(os.getcwd(), local) + file_name 356 print(file_path) 357 actual_md5 = _calculate_md5(file_path) 358 print(f"Expected: {expected_md5}") 359 print(f"Actual: {actual_md5}") 360 print(f"MD5 matched {file_name}") 361 if actual_md5 != expected_md5: 362 print(f"[Fail]MD5 mismatch for {file_name}") 363 result *= 0 364 365 return (result == 1) 366 367 368def _check_file(local, remote, head=None): 369 if head is None: 370 head = GP.hdc_head 371 if remote.startswith("/proc"): 372 local_size = os.path.getsize(local) 373 if local_size > 0: 374 return True 375 else: 376 return False 377 else: 378 cmd = f"shell md5sum {remote}" 379 local_md5 = get_local_md5(local) 380 return check_shell(cmd, local_md5, head=head) 381 382 383def _check_app_installed(bundle, is_shared=False): 384 dump = "dump-shared" if is_shared else "dump" 385 cmd = f"shell bm {dump} -a" 386 return check_shell(cmd, bundle) 387 388 389def check_hdc_targets(): 390 cmd = f"{GP.hdc_head} list targets" 391 print(GP.device_name) 392 return check_shell(cmd, GP.device_name) 393 394 395def check_file_send(local, remote): 396 local_path = os.path.join(GP.local_path, local) 397 remote_path = f"{GP.remote_path}/{remote}" 398 cmd = f"file send {local_path} {remote_path}" 399 return check_shell(cmd) and _check_file(local_path, remote_path) 400 401 402def check_file_recv(remote, local): 403 local_path = os.path.join(GP.local_path, local) 404 remote_path = f"{GP.remote_path}/{remote}" 405 cmd = f"file recv {remote_path} {local_path}" 406 return check_shell(cmd) and _check_file(local_path, remote_path) 407 408 409def check_app_install(app, bundle, args=""): 410 app = os.path.join(GP.local_path, app) 411 install_cmd = f"install {args} {app}" 412 if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): 413 return check_shell(install_cmd, "failed to install bundle") 414 else: 415 return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args) 416 417 418def check_app_uninstall(bundle, args=""): 419 uninstall_cmd = f"uninstall {args} {bundle}" 420 return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args) 421 422 423def check_app_install_multi(tables, args=""): 424 apps = [] 425 bundles = [] 426 for app, bundle in tables.items() : 427 app = os.path.join(GP.local_path, app) 428 apps.append(app) 429 bundles.append(bundle) 430 431 apps_str = " ".join(apps) 432 install_cmd = f"install {args} {apps_str}" 433 434 if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str)) 435 or (args == "" and 0 == apps_str.count(".hap"))): 436 if not check_shell(install_cmd, "failed to install bundle"): 437 return False 438 else: 439 if not check_shell(install_cmd, "successfully"): 440 return False 441 442 for bundle in bundles: 443 if not _check_app_installed(bundle, "s" in args): 444 return False 445 446 return True 447 448 449def check_app_uninstall_multi(tables, args=""): 450 for app, bundle in tables.items() : 451 if not check_app_uninstall(bundle, args): 452 return False 453 454 return True 455 456 457def check_app_not_exist(app, bundle, args=""): 458 app = os.path.join(GP.local_path, app) 459 install_cmd = f"install {args} {app}" 460 if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): 461 return check_shell(install_cmd, "Error opening file") 462 return False 463 464 465def check_app_dir_not_exist(app, bundle, args=""): 466 app = os.path.join(GP.local_path, app) 467 install_cmd = f"install {args} {app}" 468 return check_shell(install_cmd, "Not any installation package was found") 469 470 471def check_hdc_cmd(cmd, pattern=None, head=None, is_single_dir=True, **args): 472 if head is None: 473 head = GP.hdc_head 474 if cmd.startswith("file"): 475 if not check_shell(cmd, "FileTransfer finish", head=head): 476 return False 477 if cmd.startswith("file send"): 478 local, remote = cmd.split()[-2:] 479 if remote[-1] == '/' or remote[-1] == '\\': 480 remote = f"{remote}{os.path.basename(local)}" 481 else: 482 remote, local = cmd.split()[-2:] 483 if local[-1] == '/' or local[-1] == '\\': 484 local = f"{local}{os.path.basename(remote)}" 485 if "-b" in cmd: 486 mnt_debug_path = "mnt/debug/100/debug_hap/" 487 remote = f"{mnt_debug_path}/{GP.debug_app}/{remote}" 488 if os.path.isfile(local): 489 return _check_file(local, remote, head=head) 490 else: 491 return check_dir(local, remote, is_single_dir=is_single_dir) 492 elif cmd.startswith("install"): 493 bundle = args.get("bundle", "invalid") 494 opt = " ".join(cmd.split()[1:-1]) 495 return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt) 496 497 elif cmd.startswith("uninstall"): 498 bundle = cmd.split()[-1] 499 opt = " ".join(cmd.split()[1:-1]) 500 return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt) 501 502 else: 503 return check_shell(cmd, pattern, head=head, **args) 504 505 506def check_soft_local(local_source, local_soft, remote): 507 cmd = f"file send {local_soft} {remote}" 508 if not check_shell(cmd, "FileTransfer finish"): 509 return False 510 return _check_file(local_source, remote) 511 512 513def check_soft_remote(remote_source, remote_soft, local_recv): 514 check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}") 515 cmd = f"file recv {remote_soft} {local_recv}" 516 if not check_shell(cmd, "FileTransfer finish"): 517 return False 518 return _check_file(local_recv, get_remote_path(remote_source)) 519 520 521def switch_usb(): 522 res = check_hdc_cmd("tmode usb") 523 time.sleep(3) 524 if res: 525 GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}" 526 return res 527 528 529def copy_file(src, dst): 530 try: 531 shutil.copy2(src, dst) 532 print(f"File copied successfully from {src} to {dst}") 533 except IOError as e: 534 print(f"Unable to copy file. {e}") 535 except Exception as e: 536 print(f"Unexpected error: {e}") 537 538 539def switch_tcp(): 540 if not GP.remote_ip: # skip tcp check 541 print("!!! remote_ip is none, skip tcp check !!!") 542 return True 543 if GP.remote_ip == "auto": 544 ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True) 545 if not ipconf: 546 print("!!! device ip not found, skip tcp check !!!") 547 return True 548 GP.remote_ip = ipconf.split(":")[1].split()[0] 549 print(f"fetch remote ip: {GP.remote_ip}") 550 ret = check_hdc_cmd(f"tmode port {GP.remote_port}") 551 if ret: 552 time.sleep(3) 553 res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK") 554 if res: 555 GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}" 556 return res 557 558 559def select_cmd(): 560 msg = "1) Proceed tester\n" \ 561 + "2) Customize tester\n" \ 562 + "3) Setup files for transfer\n" \ 563 + "4) Load custom testcase(default unused) \n" \ 564 + "5) Exit\n" \ 565 + ">> " 566 567 while True: 568 opt = input(msg).strip() 569 if len(opt) == 1 and '1' <= opt <= '5': 570 return opt 571 572 573def gen_file(path, size): 574 index = 0 575 path = os.path.abspath(path) 576 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 577 578 while index < size: 579 os.write(fd, os.urandom(1024)) 580 index += 1024 581 os.close(fd) 582 583 584def gen_version_file(path): 585 with open(path, "w") as f: 586 f.write(GP.get_version()) 587 588 589def gen_zero_file(path, size): 590 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 591 os.write(fd, b'0' * size) 592 os.close(fd) 593 594 595def create_file_with_size(path, size): 596 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 597 os.write(fd, b'\0' * size) 598 os.close(fd) 599 600 601def gen_soft_link(): 602 print("generating soft link ...") 603 depth_1_path = os.path.join(GP.local_path, "d1") 604 depth_2_path = os.path.join(GP.local_path, "d1", "d2") 605 if not os.path.exists(os.path.join(GP.local_path, "d1")): 606 os.mkdir(depth_1_path) 607 os.mkdir(depth_2_path) 608 copy_file(os.path.join(GP.local_path, "small"), depth_2_path) 609 try: 610 os.symlink("small", os.path.join(GP.local_path, "soft_small")) 611 except FileExistsError: 612 print("soft_small already exists") 613 except OSError: 614 print("生成soft_small失败,需要使用管理员权限用户执行软链接生成") 615 try: 616 os.symlink("d1", os.path.join(GP.local_path, "soft_dir")) 617 except FileExistsError: 618 print("soft_dir already exists") 619 except OSError: 620 print("生成soft_dir失败,需要使用管理员权限用户执行软链接生成") 621 622 623def gen_file_set(): 624 print("generating empty file ...") 625 gen_file(os.path.join(GP.local_path, "empty"), 0) 626 627 print("generating small file ...") 628 gen_file(os.path.join(GP.local_path, "small"), 102400) 629 630 print("generating medium file ...") 631 gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2) 632 633 print("generating large file ...") 634 gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3) 635 636 print("generating text file ...") 637 gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2) 638 639 gen_soft_link() 640 print("generating package dir ...") 641 if not os.path.exists(os.path.join(GP.local_path, "package")): 642 os.mkdir(os.path.join(GP.local_path, "package")) 643 for i in range(1, 6): 644 gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2) 645 646 print("generating deep dir ...") 647 deepth = 4 648 deep_path = os.path.join(GP.local_path, "deep_dir") 649 if not os.path.exists(deep_path): 650 os.mkdir(deep_path) 651 for deep in range(deepth): 652 deep_path = os.path.join(deep_path, f"deep_dir{deep}") 653 if not os.path.exists(deep_path): 654 os.mkdir(deep_path) 655 gen_file(os.path.join(deep_path, "deep"), 102400) 656 657 print("generating dir with file ...") 658 dir_path = os.path.join(GP.local_path, "problem_dir") 659 rmdir(dir_path) 660 os.mkdir(dir_path) 661 gen_file(os.path.join(dir_path, "small2"), 102400) 662 663 fuzz_count = 47 # 47 is the count that circulated the file transfer 664 data_unit = 1024 # 1024 is the size that circulated the file transfer 665 data_extra = 936 # 936 is the size that cased the extra file transfer 666 for i in range(fuzz_count): 667 create_file_with_size( 668 os.path.join(dir_path, f"file_{i * data_unit+data_extra}.dat"), i * data_unit + data_extra) 669 print("generating empty dir ...") 670 dir_path = os.path.join(GP.local_path, "empty_dir") 671 rmdir(dir_path) 672 os.mkdir(dir_path) 673 print("generating version file ...") 674 gen_version_file(os.path.join(GP.local_path, "version")) 675 676 677def gen_package_dir(): 678 print("generating app dir ...") 679 dir_path = os.path.join(GP.local_path, "app_dir") 680 if os.path.exists(dir_path): 681 rmdir(dir_path) 682 os.mkdir(dir_path) 683 app = os.path.join(GP.local_path, "AACommand07.hap") 684 dst_dir = os.path.join(GP.local_path, "app_dir") 685 if not os.path.exists(app): 686 print(f"Source file {app} does not exist.") 687 else: 688 copy_file(app, dst_dir) 689 690 691def prepare_source(): 692 version_file = os.path.join(GP.local_path, "version") 693 if os.path.exists(version_file): 694 with open(version_file, "r") as f: 695 version = f.read() 696 if version == GP.get_version(): 697 print(f"hdc test version is {GP.get_version()}, check ok, skip prepare.") 698 return 699 print(f"in prepare {GP.local_path},wait for 2 mins.") 700 current_path = os.getcwd() 701 702 if os.path.exists(GP.local_path): 703 #打开local_path遍历其中的文件,删除hap hsp以外的所有文件 704 for file in os.listdir(GP.local_path): 705 if file.endswith(".hap") or file.endswith(".hsp"): 706 continue 707 file_path = os.path.join(GP.local_path, file) 708 rmdir(file_path) 709 else: 710 os.mkdir(GP.local_path) 711 712 gen_file_set() 713 714 715def add_prepare_source(): 716 deep_path = os.path.join(GP.local_path, "deep_test_dir") 717 print("generating deep test dir ...") 718 absolute_path = os.path.abspath(__file__) 719 deepth = (255 - 9 - len(absolute_path)) % 14 720 os.mkdir(deep_path) 721 for deep in range(deepth): 722 deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") 723 os.mkdir(deep_path) 724 gen_file(os.path.join(deep_path, "deep_test"), 102400) 725 726 recv_dir = os.path.join(GP.local_path, "recv_test_dir") 727 print("generating recv test dir ...") 728 os.mkdir(recv_dir) 729 730 731def update_source(): 732 deep_path = os.path.join(GP.local_path, "deep_test_dir") 733 if not os.path.exists(deep_path): 734 print("generating deep test dir ...") 735 absolute_path = os.path.abspath(__file__) 736 deepth = (255 - 9 - len(absolute_path)) % 14 737 os.mkdir(deep_path) 738 for deep in range(deepth): 739 deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") 740 os.mkdir(deep_path) 741 gen_file(os.path.join(deep_path, "deep_test"), 102400) 742 743 recv_dir = os.path.join(GP.local_path, "recv_test_dir") 744 if not os.path.exists(recv_dir): 745 print("generating recv test dir ...") 746 os.mkdir(recv_dir) 747 748 749def load_testcase(): 750 if not GP.load_testcase: 751 print("load testcase failed") 752 return False 753 print("load testcase success") 754 return True 755 756 757def check_library_installation(library_name): 758 try: 759 importlib.metadata.version(library_name) 760 return 0 761 except importlib.metadata.PackageNotFoundError: 762 print(f"\n\n{library_name} is not installed.\n\n") 763 print(f"try to use command below:") 764 print(f"pip install {library_name}") 765 return 1 766 767 768def check_subprocess_cmd(cmd, process_num, timeout): 769 770 for i in range(process_num): 771 p = subprocess.Popen(cmd.split()) 772 try: 773 p.wait(timeout=5) 774 except subprocess.TimeoutExpired: 775 p.kill() 776 777 778def create_file_commands(local, remote, mode, num): 779 if mode == "send": 780 return [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)] 781 elif mode == "recv": 782 return [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)] 783 else: 784 return [] 785 786 787def create_dir_commands(local, remote, mode, num): 788 if mode == "send": 789 return [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)] 790 elif mode == "recv": 791 return [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)] 792 else: 793 return [] 794 795 796def execute_commands(commands): 797 processes = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in commands] 798 while processes: 799 for p in processes: 800 if not handle_process(p, processes, assert_out="FileTransfer finish"): 801 return False 802 return True 803 804 805def handle_process(p, processes, assert_out="FileTransfer finish"): 806 if p.poll() is not None: 807 stdout, stderr = p.communicate(timeout=512) # timeout wait 512s 808 if stderr: 809 print(f"{stderr.decode()}") 810 if stdout: 811 print(f"{stdout.decode()}") 812 if assert_out is not None and stdout.decode().find(assert_out) == -1: 813 return False 814 processes.remove(p) 815 return True 816 817 818def check_files(local, remote, mode, num): 819 res = 1 820 for i in range(num): 821 if mode == "send": 822 if _check_file(local, f"{remote}_{i}"): 823 res *= 1 824 else: 825 res *= 0 826 elif mode == "recv": 827 if _check_file(f"{local}_{i}", f"{remote}_{i}"): 828 res *= 1 829 else: 830 res *= 0 831 return res == 1 832 833 834def check_dirs(local, remote, mode, num): 835 res = 1 836 for _ in range(num): 837 if mode == "send": 838 end_of_file_name = os.path.basename(local) 839 if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True): 840 res *= 1 841 else: 842 res *= 0 843 elif mode == "recv": 844 end_of_file_name = os.path.basename(remote) 845 local = os.path.join(local, end_of_file_name) 846 if check_dir(f"{local}", f"{remote}", is_single_dir=True): 847 res *= 1 848 else: 849 res *= 0 850 return res == 1 851 852 853def make_multiprocess_file(local, remote, mode, num, task_type): 854 if num < 1: 855 return False 856 857 if task_type == "file": 858 commands = create_file_commands(local, remote, mode, num) 859 elif task_type == "dir": 860 commands = create_dir_commands(local, remote, mode, num) 861 else: 862 return False 863 864 print(commands[0]) 865 if not execute_commands(commands): 866 return False 867 868 if task_type == "file": 869 return check_files(local, remote, mode, num) 870 elif task_type == "dir": 871 return check_dirs(local, remote, mode, num) 872 else: 873 return False 874 875 876def hdc_get_key(cmd): 877 test_cmd = f"{GP.hdc_head} {cmd}" 878 result = subprocess.check_output(test_cmd.split()).decode() 879 return result 880 881 882def check_hdc_version(cmd, version): 883 884 def _convert_version_to_hex(_version): 885 parts = _version.split("Ver: ")[1].split('.') 886 hex_version = ''.join(parts) 887 return int(hex_version, 36) 888 889 expected_version = _convert_version_to_hex(version) 890 cmd = f"{GP.hdc_head} {cmd}" 891 print(f"\nexecuting command: {cmd}") 892 if version is not None: # check output valid 893 output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "") 894 real_version = _convert_version_to_hex(output) 895 print(f"--> output: {output}") 896 print(f"--> your local [{version}] is" 897 f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]" 898 ) 899 return expected_version <= real_version 900 901 902def check_cmd_time(cmd, pattern, duration, times): 903 if times < 1: 904 print("times should be bigger than 0.") 905 return False 906 if pattern is None: 907 fetchable = True 908 else: 909 fetchable = False 910 start_time = time.time() * 1000 911 print(f"{cmd} start {start_time}") 912 res = [] 913 for i in range(times): 914 start_in = time.time() * 1000 915 if pattern is None: 916 subprocess.check_output(f"{GP.hdc_head} {cmd}".split()) 917 elif not check_shell(cmd, pattern, fetch=fetchable): 918 return False 919 start_out = time.time() * 1000 920 res.append(start_out - start_in) 921 922 # 计算最大值、最小值和中位数 923 max_value = max(res) 924 min_value = min(res) 925 median_value = sorted(res)[len(res) // 2] 926 927 print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}") 928 print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}") 929 print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}") 930 931 end_time = time.time() * 1000 932 933 try: 934 timecost = int(end_time - start_time) / times 935 print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}") 936 except ZeroDivisionError: 937 print(f"除数为0") 938 939 if duration is None: 940 duration = 150 * 1.2 941 # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status 942 return timecost < duration 943 944 945def check_rom(baseline): 946 947 def _try_get_size(message): 948 try: 949 size = int(message.split('\t')[0]) 950 except ValueError: 951 size = -9999 * 1024 # error size 952 print(f"try get size value error, from {message}") 953 return size 954 955 if baseline is None: 956 baseline = 2200 957 # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together 958 cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd" 959 result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode() 960 hdcd_size = _try_get_size(result_hdcd) 961 cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so" 962 result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode() 963 if "directory" in result_libhdc: 964 cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so" 965 result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode() 966 if "directory" in result_libhdc64: 967 libhdc_size = 0 968 else: 969 libhdc_size = _try_get_size(result_libhdc64) 970 else: 971 libhdc_size = _try_get_size(result_libhdc) 972 all_size = hdcd_size + libhdc_size 973 GP.hdcd_rom = all_size 974 if all_size < 0: 975 GP.hdcd_rom = "error" 976 return False 977 else: 978 GP.hdcd_rom = f"{all_size} KB" 979 if all_size > baseline: 980 print(f"rom size is {all_size}, overlimit baseline {baseline}") 981 return False 982 else: 983 print(f"rom size is {all_size}, underlimit baseline {baseline}") 984 return True 985 986 987def run_command_with_timeout(command, timeout): 988 try: 989 result = subprocess.run(command.split(), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) 990 return result.stdout.decode(), result.stderr.decode() 991 except subprocess.TimeoutExpired: 992 return "", "Command timed out" 993 except subprocess.CalledProcessError as e: 994 return "", e.stderr.decode() 995 996 997def check_cmd_block(command, pattern, timeout=600): 998 # 启动子进程 999 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 1000 1001 # 用于存储子进程的输出 1002 output = "" 1003 1004 try: 1005 # 读取子进程的输出 1006 output, _ = process.communicate(timeout=timeout) 1007 except subprocess.TimeoutExpired: 1008 process.terminate() 1009 process.kill() 1010 output, _ = process.communicate(timeout=timeout) 1011 1012 print(f"--> output: {output}") 1013 if pattern in output: 1014 return True 1015 else: 1016 return False 1017 1018 1019def check_version(version): 1020 def decorator(func): 1021 @functools.wraps(func) 1022 def wrapper(*args, **kwargs): 1023 if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version): 1024 print("version does not match, ignore this case") 1025 pytest.skip("Version does not match, test skipped.") 1026 return func(*args, **kwargs) 1027 return wrapper 1028 return decorator 1029 1030 1031@pytest.fixture(scope='class', autouse=True) 1032def load_gp(request): 1033 GP.load() 1034 1035 1036class ConfigFileNotFoundException(Exception): 1037 """配置文件未找到异常""" 1038 pass 1039 1040 1041def get_hdcd_pss(): 1042 mem_string = get_shell_result("shell hidumper --mem `pidof hdcd`") 1043 print(f"--> hdcd mem: \n{mem_string}") 1044 pss_string = get_shell_result("shell hidumper --mem `pidof hdcd` | grep Total") 1045 if "Total" in pss_string: 1046 pss_value = int(re.sub(r"\s+", " ", pss_string.split('\n')[1]).split(' ')[2]) 1047 else: 1048 print("error: can't get pss value, message:%s", pss_string) 1049 pss_value = 0 1050 return pss_value 1051 1052 1053def get_hdcd_fd_count(): 1054 sep = '/' 1055 fd_string = get_shell_result(f"shell ls {sep}proc/`pidof hdcd`/fd | wc -l") 1056 print(f"--> hdcd fd cound: {fd_string}") 1057 try: 1058 end_symbol = get_end_symbol() 1059 fd_value = int(fd_string.split(end_symbol)[0]) 1060 except ValueError: 1061 fd_value = 0 1062 return fd_value 1063 1064 1065def get_end_symbol(): 1066 return sys.platform == 'win32' and "\r\n" or '\n' 1067 1068 1069def get_server_pid_from_file(): 1070 is_ohos = "Harmony" in platform.system() 1071 if not is_ohos: 1072 tmp_dir_path = tempfile.gettempdir() 1073 else: 1074 tmp_dir_path = os.path.expanduser("~") 1075 pid_file = os.path.join(tmp_dir_path, ".HDCServer.pid") 1076 with open(pid_file, "r") as f: 1077 pid = f.read() 1078 try: 1079 pid = int(pid) 1080 except ValueError: 1081 pid = 0 1082 print(f"--> pid of hdcserver is {pid}") 1083 return pid 1084 1085 1086def check_unsupport_systems(systems): 1087 def decorator(func): 1088 @functools.wraps(func) 1089 def wrapper(*args, **kwargs): 1090 cur_sys = platform.system() 1091 for system in systems: 1092 if system in cur_sys: 1093 print("system not support, ignore this case") 1094 pytest.skip("System not support, test skipped.") 1095 return func(*args, **kwargs) 1096 return wrapper 1097 return decorator 1098 1099 1100@check_unsupport_systems(["Harmony"]) 1101def get_cmd_block_output(command, timeout=600): 1102 # 启动子进程 1103 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 1104 1105 # 用于存储子进程的输出 1106 output = "" 1107 1108 try: 1109 # 读取子进程的输出 1110 output, _ = process.communicate(timeout=timeout) 1111 except subprocess.TimeoutExpired: 1112 process.terminate() 1113 process.kill() 1114 output, _ = process.communicate(timeout=timeout) 1115 1116 print(f"--> output: {output}") 1117 return output 1118 1119 1120def get_cmd_block_output_and_error(command, timeout=600): 1121 print(f"cmd: {command}") 1122 # 启动子进程 1123 process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 1124 1125 # 用于存储子进程的输出 1126 output = "" 1127 error = "" 1128 1129 try: 1130 # 读取子进程的输出 1131 output, error = process.communicate(timeout=timeout) 1132 except subprocess.TimeoutExpired: 1133 process.terminate() 1134 process.kill() 1135 output, error = process.communicate(timeout=timeout) 1136 1137 print(f"--> output: {output}") 1138 print(f"--> error: {error}") 1139 return output, error 1140 1141 1142def send_file(conn, file_name): 1143 """ 1144 socket收发数据相关的功能函数,用于socket收发数据测试 1145 文件发送端发送文件到接收端 1146 1. Sender send file size(bytes string) to receiver 1147 2. The sender waits for the receiver to send back the file size 1148 3. The sender cyclically sends file data to the receiver 1149 """ 1150 logger.info(f"send_file enter:{file_name}") 1151 file_path = os.path.join(GP.local_path, file_name) 1152 logger.info(f"send_file file full name:{file_path}") 1153 file_size = os.path.getsize(file_path) 1154 logger.info(f"send_file file size:{file_size}") 1155 1156 # send size 1157 conn.send(str(file_size).encode('utf-8')) 1158 1159 # recv file size for check 1160 logger.info(f"send_file: start recv check size") 1161 size_raw = conn.recv(1024) 1162 logger.info(f"send_file: check size_raw {size_raw}") 1163 if len(size_raw) == 0: 1164 logger.error(f"send_file: recv check size len is 0, exit") 1165 return 1166 file_size_recv = int(size_raw.decode('utf-8')) 1167 if file_size_recv != file_size: 1168 logger.error(f"send_file: check size failed, file_size_recv:{file_size_recv} file size:{file_size}") 1169 return 1170 1171 logger.info(f"send_file start send file data") 1172 index = 0 1173 with open(file_path, 'rb') as f: 1174 while True: 1175 one_block = f.read(4096) 1176 if not one_block: 1177 logger.info(f"send_file index:{index} read 0 block") 1178 break 1179 conn.send(one_block) 1180 index = index + 1 1181 1182 1183def process_conn(conn, addr): 1184 """ 1185 socket收发数据相关的功能函数,用于socket收发数据测试 1186 Server client interaction description: 1187 1. client send "get [file_name]" to server 1188 2. server send file size(string) to client 1189 3. client send back size to server 1190 4. server send file data to client 1191 """ 1192 conn.settimeout(5) # timeout 5 second 1193 try: 1194 logger.info(f"server accept, addr:{str(addr)}") 1195 message = conn.recv(1024) 1196 message_str = message.decode('utf-8') 1197 logger.info(f"conn recv msg [{len(message_str)}] {message_str}") 1198 if len(message) == 0: 1199 conn.close() 1200 logger.info(f"conn msg len is 0, close {conn} addr:{addr}") 1201 return 1202 cmds = message_str.split() 1203 logger.info(f"conn cmds:{cmds}") 1204 cmd = cmds[0] 1205 if cmd == "get": # ['get', 'xxxx'] 1206 logger.info(f"conn cmd is get, file name:{cmds[1]}") 1207 file_name = cmds[1] 1208 send_file(conn, file_name) 1209 logger.info(f"conn normal close") 1210 except socket.timeout: 1211 logger.info(f"conn:{conn} comm timeout, addr:{addr}") 1212 except ConnectionResetError: 1213 logger.info(f"conn:{conn} ConnectionResetError, addr:{addr}") 1214 conn.close() 1215 1216 1217def server_loop(port_num): 1218 """ 1219 socket收发数据相关的功能函数,用于socket收发数据测试 1220 服务端,每次监听,接入连接,收发数据结束后关闭监听 1221 """ 1222 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1223 server_socket.bind(('localhost', port_num)) 1224 server_socket.listen() 1225 logger.info(f"server start listen {port_num}") 1226 server_socket.settimeout(10) # timeout 10 second 1227 1228 try: 1229 conn, addr = server_socket.accept() 1230 process_conn(conn, addr) 1231 except socket.timeout: 1232 logger.error(f"server accept timeout, port:{port_num}") 1233 1234 server_socket.close() 1235 logger.info(f"server exit, port:{port_num}") 1236 1237 1238def recv_file_data(client_socket, file_path, file_size): 1239 """ 1240 socket收发数据相关的功能函数,用于socket收发数据测试 1241 """ 1242 logger.info(f"client: start recv file data, file size:{file_size}, file path:{file_path}") 1243 with open(file_path, 'wb') as f: 1244 recv_size = 0 1245 while recv_size < file_size: 1246 one_block = client_socket.recv(4096) 1247 if not one_block: 1248 logger.info(f"client: read block size 0, exit") 1249 break 1250 f.write(one_block) 1251 recv_size += len(one_block) 1252 logger.info(f"client: recv file data finished, recv size:{recv_size}") 1253 1254 1255def client_get_file(port_num, file_name, file_save_name): 1256 """ 1257 socket收发数据相关的功能函数,用于socket收发数据测试 1258 """ 1259 logger.info(f"client: connect port:{port_num}") 1260 client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1261 client_socket.settimeout(10) # timeout 10 second 1262 try: 1263 client_socket.connect(('localhost', port_num)) 1264 except socket.timeout: 1265 logger.info(f"client connect timeout, port:{port_num}") 1266 return 1267 1268 try: 1269 cmd = f"get {file_name}" 1270 logger.info(f"client: send cmd:{cmd}") 1271 client_socket.send(cmd.encode('utf-8')) 1272 1273 # recv file size 1274 size_raw = client_socket.recv(1024) 1275 logger.info(f"client: recv size_raw {size_raw}") 1276 if len(size_raw) == 0: 1277 logger.info(f"client: cmd:{cmd} recv size_raw len is 0, exit") 1278 return 1279 file_size = int(size_raw.decode('utf-8')) 1280 logger.info(f"client: file size {file_size}") 1281 1282 file_path = os.path.join(GP.local_path, file_save_name) 1283 if os.path.exists(file_path): 1284 logger.info(f"client: file {file_path} exist, delete") 1285 try: 1286 os.remove(file_path) 1287 except OSError as error: 1288 logger.info(f"delete {file_path} failed: {error.strerror}") 1289 1290 # send size msg to client for check 1291 logger.info(f"client: Send back file size:{size_raw}") 1292 client_socket.send(size_raw) 1293 recv_file_data(client_socket, file_path, file_size) 1294 except socket.timeout: 1295 logger.error(f"client communication timeout, port:{port_num}") 1296 return 1297 finally: 1298 logger.info("client socket close") 1299 client_socket.close() 1300 logger.info("client exit")