1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2021 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# 0. 运行环境: python 3.10+, pytest 17# 1. 修改 GP 中字段 18# 2. pytest [-k case_name_pattern] 19# eg. pytest -k file 执行方法名含 file 的用例 20 21# 打包 22# pip install pyinstaller 23# prepare assert source dir includes your data files 24# pyi-makespec -D --add-data assert:assert dev_hdc_test.py 25# pyinstaller dev_hdc_test.spec 26# 执行 dev_hdc_test.exe 27 28import hashlib 29import json 30import os 31import shutil 32import subprocess 33import sys 34import time 35import csv 36import pytest 37import pkg_resources 38 39 40class GP(object): 41 """ Global Parameters 42 43 customize here !!! 44 """ 45 hdc_exe = "hdc" 46 local_path = "resource" 47 remote_path = "data/local/tmp" 48 remote_ip = "auto" 49 remote_port = 8710 50 hdc_head = "hdc" 51 device_name = "" 52 targets = [] 53 tmode = "usb" 54 changed_testcase = "n" 55 testcase_path = "ts_windows.csv" 56 loaded_testcase = 0 57 58 @classmethod 59 def init(cls): 60 cls.list_targets() 61 if os.path.exists(".hdctester.conf"): 62 cls.load() 63 return 64 else: 65 cls.set_options() 66 cls.print_options() 67 cls.dump() 68 return 69 70 71 @classmethod 72 def list_targets(cls): 73 try: 74 targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split() 75 except (OSError, IndexError): 76 targets = [b"failed to auto detect device"] 77 cls.targets = [targets[0].decode()] 78 return False 79 cls.targets = [t.decode() for t in targets] 80 return True 81 82 83 @classmethod 84 def get_device(cls): 85 if len(cls.targets) > 1: 86 print("Multiple device detected, please select one:") 87 for i, t in enumerate(cls.targets): 88 print(f"{i+1}. {t}") 89 print("input the nums of the device above:") 90 cls.device_name = cls.targets[int(input()) - 1] 91 else: 92 cls.device_name = cls.targets[0] 93 if cls.device_name == "failed to auto detect device": 94 print("No device detected, please check your device connection") 95 return False 96 elif cls.device_name == "[empty]": 97 print("No hdc device detected.") 98 return False 99 cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}" 100 return True 101 102 103 @classmethod 104 def dump(cls): 105 try: 106 os.remove(".hdctester.conf") 107 except OSError: 108 pass 109 content = filter( 110 lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items()) 111 json_str = json.dumps(dict(content)) 112 fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755) 113 os.write(fd, json_str.encode()) 114 os.close(fd) 115 return True 116 117 118 @classmethod 119 def load(cls): 120 with open(".hdctester.conf") as f: 121 content = json.load(f) 122 cls.hdc_exe = content.get("hdc_exe") 123 cls.local_path = content.get("local_path") 124 cls.remote_path = content.get("remote_path") 125 cls.remote_ip = content.get("remote_ip") 126 cls.hdc_head = content.get("hdc_head") 127 cls.tmode = content.get("tmode") 128 cls.device_name = content.get("device_name") 129 cls.changed_testcase = content.get("changed_testcase") 130 cls.testcase_path = content.get("testcase_path") 131 cls.loaded_testcase = content.get("load_testcase") 132 return True 133 134 135 @classmethod 136 def print_options(cls): 137 info = "HDC Tester Default Options: \n\n" \ 138 + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \ 139 + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \ 140 + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \ 141 + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \ 142 + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \ 143 + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \ 144 + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \ 145 + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \ 146 + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \ 147 + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \ 148 + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n" 149 print(info) 150 151 152 @classmethod 153 def tconn_tcp(cls): 154 res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode() 155 if "Connect OK" in res: 156 return True 157 else: 158 return False 159 160 161 @classmethod 162 def set_options(cls): 163 if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip(): 164 cls.hdc_exe = opt 165 if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip(): 166 cls.local_path = opt 167 if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip(): 168 cls.remote_path = opt 169 if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip(): 170 cls.remote_ip = opt 171 if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip(): 172 cls.remote_port = int(opt) 173 if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip(): 174 cls.device_name = opt 175 if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip(): 176 cls.tmode = opt 177 if cls.tmode == "usb": 178 ret = cls.get_device() 179 if ret: 180 print("USB device detected.") 181 elif cls.tconn_tcp(): 182 cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}" 183 else: 184 print(f"tconn {cls.remote_ip}:{cls.remote_port} failed") 185 return False 186 return True 187 188 189 @classmethod 190 def change_testcase(cls): 191 if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip(): 192 cls.changed_testcase = opt 193 if opt == "n": 194 return False 195 if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip(): 196 cls.testcase_path = os.path.join(opt) 197 cls.print_options() 198 return True 199 200 201 @classmethod 202 def load_testcase(cls): 203 print("this fuction will coming soon.") 204 return False 205 206 207def rmdir(path): 208 try: 209 if sys.platform == "win32": 210 if os.path.isfile(path): 211 subprocess.call(f"del {path}".split()) 212 else: 213 subprocess.call(f"del /Q {path}".split()) 214 else: 215 subprocess.call(f"rm -rf {path}".split()) 216 except OSError: 217 pass 218 219 220def get_local_path(path): 221 return os.path.join(GP.local_path, path) 222 223 224def get_remote_path(path): 225 return f"{GP.remote_path}/{path}" 226 227 228def _get_local_md5(local): 229 md5_hash = hashlib.md5() 230 with open(local, "rb") as f: 231 for byte_block in iter(lambda: f.read(4096), b""): 232 md5_hash.update(byte_block) 233 return md5_hash.hexdigest() 234 235 236def check_shell(cmd, pattern=None, fetch=False): 237 cmd = f"{GP.hdc_head} {cmd}" 238 print(f"\nexecuting command: {cmd}") 239 if pattern: # check output valid 240 output = subprocess.check_output(cmd.split()).decode() 241 res = pattern in output 242 print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") 243 return res 244 elif fetch: 245 output = subprocess.check_output(cmd.split()).decode() 246 print(f"--> output: {output}") 247 return output 248 else: # check cmd run successfully 249 return subprocess.check_call(cmd.split()) == 0 250 251 252def _check_file(local, remote): 253 cmd = f"shell md5sum {remote}" 254 local_md5 = _get_local_md5(local) 255 return check_shell(cmd, local_md5) 256 257 258def _check_app_installed(bundle, is_shared=False): 259 dump = "dump-shared" if is_shared else "dump" 260 cmd = f"shell bm {dump} -a" 261 return check_shell(cmd, bundle) 262 263 264def check_hdc_targets(): 265 cmd = f"{GP.hdc_head} list targets" 266 print(GP.device_name) 267 return check_shell(cmd, GP.device_name) 268 269 270def check_file_send(local, remote): 271 local_path = os.path.join(GP.local_path, local) 272 remote_path = f"{GP.remote_path}/{remote}" 273 cmd = f"file send {local_path} {remote_path}" 274 return check_shell(cmd) and _check_file(local_path, remote_path) 275 276 277def check_file_recv(remote, local): 278 local_path = os.path.join(GP.local_path, local) 279 remote_path = f"{GP.remote_path}/{remote}" 280 cmd = f"file recv {remote_path} {local_path}" 281 return check_shell(cmd) and _check_file(local_path, remote_path) 282 283 284def check_app_install(app, bundle, args=""): 285 app = os.path.join(GP.local_path, app) 286 install_cmd = f"install {args} {app}" 287 return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args) 288 289 290def check_app_uninstall(bundle, args=""): 291 uninstall_cmd = f"uninstall {args} {bundle}" 292 return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args) 293 294 295def check_hdc_cmd(cmd, pattern=None, **args): 296 if cmd.startswith("file"): 297 if not check_shell(cmd, "FileTransfer finish"): 298 return False 299 if cmd.startswith("file send"): 300 local, remote = cmd.split()[-2:] 301 else: 302 remote, local = cmd.split()[-2:] 303 return _check_file(local, remote) 304 305 elif cmd.startswith("install"): 306 bundle = args.get("bundle", "invalid") 307 opt = " ".join(cmd.split()[1:-1]) 308 return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt) 309 310 elif cmd.startswith("uninstall"): 311 bundle = cmd.split()[-1] 312 opt = " ".join(cmd.split()[1:-1]) 313 return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt) 314 315 else: 316 return check_shell(cmd, pattern, **args) 317 318 319def switch_usb(): 320 res = check_hdc_cmd("tmode usb") 321 time.sleep(3) 322 if res: 323 GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}" 324 return res 325 326 327def switch_tcp(): 328 if not GP.remote_ip: # skip tcp check 329 print("!!! remote_ip is none, skip tcp check !!!") 330 return True 331 if GP.remote_ip == "auto": 332 ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True) 333 if not ipconf: 334 print("!!! device ip not found, skip tcp check !!!") 335 return True 336 GP.remote_ip = ipconf.split(":")[1].split()[0] 337 print(f"fetch remote ip: {GP.remote_ip}") 338 ret = check_hdc_cmd(f"tmode port {GP.remote_port}") 339 if ret: 340 time.sleep(3) 341 res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK") 342 if res: 343 GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}" 344 return res 345 346 347def select_cmd(): 348 msg = "1) Proceed tester\n" \ 349 + "2) Customize tester\n" \ 350 + "3) Setup files for transfer\n" \ 351 + "4) Load custom testcase(default unused) \n" \ 352 + "5) Exit\n" \ 353 + ">> " 354 355 while True: 356 opt = input(msg).strip() 357 if len(opt) == 1 and '1' <= opt <= '5': 358 return opt 359 360 361def prepare_source(): 362 363 def gen_file(path, size): 364 index = 0 365 path = os.path.abspath(path) 366 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 367 368 while index < size: 369 os.write(fd, os.urandom(1024)) 370 index += 1024 371 os.close(fd) 372 373 print(f"in prepare {GP.local_path},wait for 2 mins.") 374 current_path = os.getcwd() 375 os.mkdir(GP.local_path) 376 print("generating empty file ...") 377 gen_file(os.path.join(GP.local_path, "empty"), 0) 378 379 print("generating small file ...") 380 gen_file(os.path.join(GP.local_path, "small"), 102400) 381 382 print("generating large file ...") 383 gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3) 384 385 print("generating dir with small file ...") 386 dir_path = os.path.join(GP.local_path, "normal_dir") 387 rmdir(dir_path) 388 os.mkdir(dir_path) 389 gen_file(os.path.join(dir_path, "small2"), 102400) 390 391 print("generating empty dir ...") 392 dir_path = os.path.join(GP.local_path, "empty_dir") 393 rmdir(dir_path) 394 os.mkdir(dir_path) 395 396 if os.path.exists("entry-default-signed-debug.hap"): 397 print("copy the hap file to resource dir...") 398 shutil.copy("entry-default-signed-debug.hap", GP.local_path) 399 else: 400 print("No hap File!") 401 if os.path.exists("panalyticshsp-default-signed.hsp"): 402 shutil.copy("panalyticshsp-default-signed.hsp", GP.local_path) 403 else: 404 print("No hsp File!") 405 406 407def setup_tester(): 408 while True: 409 GP.print_options() 410 opt = int(select_cmd()) 411 if opt == 1: 412 return True 413 elif opt == 2: 414 if not GP.set_options(): 415 return False 416 GP.dump() 417 elif opt == 3: 418 prepare_source() 419 elif opt == 4: 420 if not GP.load_testcase(): 421 return False 422 elif opt == 5: 423 return False 424 else: 425 return False 426 427 428def load_testcase(): 429 if not GP.load_testcase: 430 print("load testcase failed") 431 return False 432 print("load testcase success") 433 return True 434 435 436def check_library_installation(library_name): 437 try: 438 pkg_resources.get_distribution(library_name) 439 return 0 440 except pkg_resources.DistributionNotFound: 441 print(f"\n\n{library_name} is not installed.\n\n") 442 print(f"try to use command below:") 443 print(f"pip install {library_name}") 444 return 1