• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2021 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# 0. 运行环境: python 3.10+, pytest
17# 1. 修改 GP 中字段
18# 2. pytest [-k case_name_pattern]
19#    eg. pytest -k file 执行方法名含 file 的用例
20
21# 打包
22# pip install pyinstaller
23# prepare assert source dir includes your data files
24# pyi-makespec  -D --add-data assert:assert dev_hdc_test.py
25# pyinstaller dev_hdc_test.spec
26# 执行 dev_hdc_test.exe
27
28import csv
29import hashlib
30import json
31import logging
32import os
33import random
34import re
35import stat
36import shutil
37import subprocess
38import sys
39import threading
40import time
41from multiprocessing import Process
42
43import pytest
44import pkg_resources
45
46
47class GP(object):
48    """ Global Parameters
49
50    customize here !!!
51    """
52    hdc_exe = "hdc"
53    local_path = "resource"
54    remote_path = "data/local/tmp"
55    remote_ip = "auto"
56    remote_port = 8710
57    hdc_head = "hdc"
58    device_name = ""
59    targets = []
60    tmode = "usb"
61    changed_testcase = "n"
62    testcase_path = "ts_windows.csv"
63    loaded_testcase = 0
64
65    @classmethod
66    def init(cls):
67        logging.basicConfig(level=logging.INFO,
68                            format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s',
69                            datefmt='%d %b %Y %H:%M:%S',
70            )
71        logging.basicConfig(level=logging.WARNING,
72                            format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s',
73                            datefmt='%d %b %Y %H:%M:%S',
74                            )
75
76        if os.path.exists(".hdctester.conf"):
77            cls.load()
78            cls.start_host()
79            cls.list_targets()
80        else:
81            cls.set_options()
82            cls.print_options()
83            cls.start_host()
84            cls.dump()
85        return
86
87
88    @classmethod
89    def start_host(cls):
90        cmd = f"{cls.hdc_exe} start"
91        res = subprocess.call(cmd.split())
92        return res
93
94
95    @classmethod
96    def list_targets(cls):
97        try:
98            targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split()
99        except (OSError, IndexError):
100            targets = [b"failed to auto detect device"]
101            cls.targets = [targets[0].decode()]
102            return False
103        cls.targets = [t.decode() for t in targets]
104        return True
105
106
107    @classmethod
108    def get_device(cls):
109        cls.start_host()
110        cls.list_targets()
111        if len(cls.targets) > 1:
112            print("Multiple device detected, please select one:")
113            for i, t in enumerate(cls.targets):
114                print(f"{i+1}. {t}")
115            print("input the nums of the device above:")
116            cls.device_name = cls.targets[int(input()) - 1]
117        else:
118            cls.device_name = cls.targets[0]
119        if cls.device_name == "failed to auto detect device":
120            print("No device detected, please check your device connection")
121            return False
122        elif cls.device_name == "[empty]":
123            print("No hdc device detected.")
124            return False
125        cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}"
126        return True
127
128
129    @classmethod
130    def dump(cls):
131        try:
132            os.remove(".hdctester.conf")
133        except OSError:
134            pass
135        content = filter(
136            lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items())
137        json_str = json.dumps(dict(content))
138        fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755)
139        os.write(fd, json_str.encode())
140        os.close(fd)
141        return True
142
143
144    @classmethod
145    def load(cls):
146        with open(".hdctester.conf") as f:
147            content = json.load(f)
148            cls.hdc_exe = content.get("hdc_exe")
149            cls.local_path = content.get("local_path")
150            cls.remote_path = content.get("remote_path")
151            cls.remote_ip = content.get("remote_ip")
152            cls.hdc_head = content.get("hdc_head")
153            cls.tmode = content.get("tmode")
154            cls.device_name = content.get("device_name")
155            cls.changed_testcase = content.get("changed_testcase")
156            cls.testcase_path = content.get("testcase_path")
157            cls.loaded_testcase = content.get("load_testcase")
158        return True
159
160
161    @classmethod
162    def print_options(cls):
163        info = "HDC Tester Default Options: \n\n" \
164        + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \
165        + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \
166        + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \
167        + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \
168        + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \
169        + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \
170        + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \
171        + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \
172        + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \
173        + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \
174        + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n"
175        print(info)
176
177
178    @classmethod
179    def tconn_tcp(cls):
180        res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode()
181        if "Connect OK" in res:
182            return True
183        else:
184            return False
185
186
187    @classmethod
188    def set_options(cls):
189        if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip():
190            cls.hdc_exe = opt
191        if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip():
192            cls.local_path = opt
193        if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip():
194            cls.remote_path = opt
195        if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip():
196            cls.remote_ip = opt
197        if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip():
198            cls.remote_port = int(opt)
199        if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip():
200            cls.device_name = opt
201        if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip():
202            cls.tmode = opt
203        if cls.tmode == "usb":
204            ret = cls.get_device()
205            if ret:
206                print("USB device detected.")
207        elif cls.tconn_tcp():
208            cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}"
209        else:
210            print(f"tconn {cls.remote_ip}:{cls.remote_port} failed")
211            return False
212        return True
213
214
215    @classmethod
216    def change_testcase(cls):
217        if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip():
218            cls.changed_testcase = opt
219            if opt == "n":
220                return False
221        if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip():
222            cls.testcase_path = os.path.join(opt)
223        cls.print_options()
224        return True
225
226
227    @classmethod
228    def load_testcase(cls):
229        print("this fuction will coming soon.")
230        return False
231
232    @classmethod
233    def get_version(cls):
234        version = f"v1.0.4a"
235        return version
236
237
238def pytest_run(args):
239    file_list = []
240    file_list.append("entry-default-signed-debug.hap")
241    file_list.append("libA_v10001.hsp")
242    file_list.append("libB_v10001.hsp")
243    for file in file_list:
244        if not os.path.exists(os.path.join(GP.local_path, file)):
245            print(f"No {file} File!")
246            print("请将package.zip中的安装包文件解压到当前脚本resource目录中,操作完成该步骤后重新执行脚本。")
247            print("Please unzip package.zip to resource directory, please rerun after operation.")
248            input("[ENTER]")
249            return
250    gen_package_dir()
251    start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
252    if args.count is not None:
253        for i in range(args.count):
254            print(f"------------The {i}/{args.count} Test-------------")
255            timestamp = time.time()
256            pytest_args = [
257                '--verbose', args.verbose,
258                '--report=report.html',
259                '--title=HDC Test Report 'f"{GP.get_version()}",
260                '--tester=tester001',
261                '--template=1',
262                '--desc='f"{args.verbose}:{args.desc}"
263            ]
264            pytest.main(pytest_args)
265    end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
266    report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time()))
267    report_dir = os.path.join(os.getcwd(), "reports")
268    report_file = os.path.join(report_dir, f"{report_time}report.html")
269    print(f"Test over, the script version is {GP.get_version()},"
270        f" start at {start_time}, end at {end_time} \n"
271        f"=======>{report_file} is saved. \n"
272    )
273    input("=======>press [Enter] key to Show logs.")
274
275
276def rmdir(path):
277    try:
278        if sys.platform == "win32":
279            if os.path.isfile(path):
280                os.remove(path)
281            else:
282                shutil.rmtree(path)
283        else:
284            subprocess.call(f"rm -rf {path}".split())
285    except OSError:
286        print(f"Error: {path} : cannot remove")
287        pass
288
289
290def get_local_path(path):
291    return os.path.join(GP.local_path, path)
292
293
294def get_remote_path(path):
295    return f"{GP.remote_path}/{path}"
296
297
298def get_local_md5(local):
299    md5_hash = hashlib.md5()
300    with open(local, "rb") as f:
301        for byte_block in iter(lambda: f.read(4096), b""):
302            md5_hash.update(byte_block)
303    return md5_hash.hexdigest()
304
305
306def check_shell(cmd, pattern=None, fetch=False):
307    cmd = f"{GP.hdc_head} {cmd}"
308    print(f"\nexecuting command: {cmd}")
309    if pattern: # check output valid
310        output = subprocess.check_output(cmd.split()).decode()
311        res = pattern in output
312        print(f"--> output: {output}")
313        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
314        return res
315    elif fetch:
316        output = subprocess.check_output(cmd.split()).decode()
317        print(f"--> output: {output}")
318        return output
319    else: # check cmd run successfully
320        return subprocess.check_call(cmd.split()) == 0
321
322
323def get_shell_result(cmd, pattern=None, fetch=False):
324    cmd = f"{GP.hdc_head} {cmd}"
325    print(f"\nexecuting command: {cmd}")
326    return subprocess.check_output(cmd.split()).decode()
327
328
329def check_rate(cmd, expected_rate):
330    send_result = get_shell_result(cmd)
331    rate = float(send_result.split("rate:")[1].split("kB/s")[0])
332    return rate > expected_rate
333
334
335def check_dir(local, remote, is_single_dir=False):
336    def _get_md5sum(remote, is_single_dir=False):
337        if is_single_dir:
338            cmd = f"{GP.hdc_head} shell md5sum {remote}/*"
339        else:
340            cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;'
341        result = subprocess.check_output(cmd.split()).decode()
342        return result
343
344    def _calculate_md5(file_path):
345        md5 = hashlib.md5()
346        try:
347            with open(file_path, 'rb') as f:
348                for chunk in iter(lambda: f.read(4096), b""):
349                    md5.update(chunk)
350            return md5.hexdigest()
351        except PermissionError:
352            return "PermissionError"
353        except FileNotFoundError:
354            return "FileNotFoundError"
355    print("remote:" + remote)
356    output = _get_md5sum(remote)
357    print(output)
358
359    result = 1
360    for line in output.splitlines():
361        if len(line) < 32: # length of MD5
362            continue
363        expected_md5, file_name = line.split()[:2]
364        if is_single_dir:
365            file_name = file_name.replace(f"{remote}", "")
366        elif GP.remote_path in remote:
367            file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\")
368        else:
369            file_name = file_name.split(remote)[1].replace("/", "\\")
370        file_path = os.path.join(os.getcwd(), GP.local_path) + file_name  # 构建完整的文件路径
371        if is_single_dir:
372            file_path = os.path.join(os.getcwd(), local) + file_name
373        print(file_path)
374        actual_md5 = _calculate_md5(file_path)
375        print(f"Expected: {expected_md5}")
376        print(f"Actual: {actual_md5}")
377        print(f"MD5 matched {file_name}")
378        if actual_md5 != expected_md5:
379            print(f"[Fail]MD5 mismatch for {file_name}")
380            result *= 0
381
382    return (result == 1)
383
384
385def _check_file(local, remote):
386    if remote.startswith("/proc"):
387        local_size = os.path.getsize(local)
388        if local_size > 0:
389            return True
390        else:
391            return False
392    else:
393        cmd = f"shell md5sum {remote}"
394        local_md5 = get_local_md5(local)
395        return check_shell(cmd, local_md5)
396
397
398def _check_app_installed(bundle, is_shared=False):
399    dump = "dump-shared" if is_shared else "dump"
400    cmd = f"shell bm {dump} -a"
401    return check_shell(cmd, bundle)
402
403
404def check_hdc_targets():
405    cmd = f"{GP.hdc_head} list targets"
406    print(GP.device_name)
407    return check_shell(cmd, GP.device_name)
408
409
410def check_file_send(local, remote):
411    local_path = os.path.join(GP.local_path, local)
412    remote_path = f"{GP.remote_path}/{remote}"
413    cmd = f"file send {local_path} {remote_path}"
414    return check_shell(cmd) and _check_file(local_path, remote_path)
415
416
417def check_file_recv(remote, local):
418    local_path = os.path.join(GP.local_path, local)
419    remote_path = f"{GP.remote_path}/{remote}"
420    cmd = f"file recv {remote_path} {local_path}"
421    return check_shell(cmd) and _check_file(local_path, remote_path)
422
423
424def check_app_install(app, bundle, args=""):
425    app = os.path.join(GP.local_path, app)
426    install_cmd = f"install {args} {app}"
427    if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")):
428        return check_shell(install_cmd, "failed to install bundle")
429    else:
430        return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args)
431
432
433def check_app_uninstall(bundle, args=""):
434    uninstall_cmd = f"uninstall {args} {bundle}"
435    return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args)
436
437
438def check_app_install_multi(tables, args=""):
439    apps = []
440    bundles = []
441    for app, bundle in tables.items() :
442        app = os.path.join(GP.local_path, app)
443        apps.append(app)
444        bundles.append(bundle)
445
446    apps_str = " ".join(apps)
447    install_cmd = f"install {args} {apps_str}"
448
449    if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str))
450        or (args == "" and 0 == apps_str.count(".hap"))):
451        if not check_shell(install_cmd, "failed to install bundle"):
452            return False
453    else:
454        if not check_shell(install_cmd, "successfully"):
455            return False
456
457        for bundle in bundles:
458            if not _check_app_installed(bundle, "s" in args):
459                return False
460
461    return True
462
463
464def check_app_uninstall_multi(tables, args=""):
465    for app, bundle in tables.items() :
466        if not check_app_uninstall(bundle, args):
467            return False
468
469    return True
470
471
472def check_hdc_cmd(cmd, pattern=None, **args):
473    if cmd.startswith("file"):
474        if not check_shell(cmd, "FileTransfer finish"):
475            return False
476        if cmd.startswith("file send"):
477            local, remote = cmd.split()[-2:]
478        else:
479            remote, local = cmd.split()[-2:]
480        if os.path.isfile(local):
481            return _check_file(local, remote)
482        else:
483            return check_dir(local, remote)
484    elif cmd.startswith("install"):
485        bundle = args.get("bundle", "invalid")
486        opt = " ".join(cmd.split()[1:-1])
487        return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt)
488
489    elif cmd.startswith("uninstall"):
490        bundle = cmd.split()[-1]
491        opt = " ".join(cmd.split()[1:-1])
492        return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt)
493
494    else:
495        return check_shell(cmd, pattern, **args)
496
497
498def check_soft_local(local_source, local_soft, remote):
499    cmd = f"file send {local_soft} {remote}"
500    if not check_shell(cmd, "FileTransfer finish"):
501        return False
502    return _check_file(local_source, remote)
503
504
505def check_soft_remote(remote_source, remote_soft, local_recv):
506    check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}")
507    cmd = f"file recv {remote_soft} {local_recv}"
508    if not check_shell(cmd, "FileTransfer finish"):
509        return False
510    return _check_file(local_recv, get_remote_path(remote_source))
511
512
513def switch_usb():
514    res = check_hdc_cmd("tmode usb")
515    time.sleep(3)
516    if res:
517        GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}"
518    return res
519
520
521def copy_file(src, dst):
522    try:
523        shutil.copy2(src, dst)
524        print(f"File copied successfully from {src} to {dst}")
525    except IOError as e:
526        print(f"Unable to copy file. {e}")
527    except Exception as e:
528        print(f"Unexpected error: {e}")
529
530
531def switch_tcp():
532    if not GP.remote_ip: # skip tcp check
533        print("!!! remote_ip is none, skip tcp check !!!")
534        return True
535    if GP.remote_ip == "auto":
536        ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True)
537        if not ipconf:
538            print("!!! device ip not found, skip tcp check !!!")
539            return True
540        GP.remote_ip = ipconf.split(":")[1].split()[0]
541        print(f"fetch remote ip: {GP.remote_ip}")
542    ret = check_hdc_cmd(f"tmode port {GP.remote_port}")
543    if ret:
544        time.sleep(3)
545    res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK")
546    if res:
547        GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}"
548    return res
549
550
551def select_cmd():
552    msg = "1) Proceed tester\n" \
553        + "2) Customize tester\n" \
554        + "3) Setup files for transfer\n" \
555        + "4) Load custom testcase(default unused) \n" \
556        + "5) Exit\n" \
557        + ">> "
558
559    while True:
560        opt = input(msg).strip()
561        if len(opt) == 1 and '1' <= opt <= '5':
562            return opt
563
564
565def gen_file(path, size):
566    index = 0
567    path = os.path.abspath(path)
568    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
569
570    while index < size:
571        os.write(fd, os.urandom(1024))
572        index += 1024
573    os.close(fd)
574
575
576def gen_zero_file(path, size):
577    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
578    os.write(fd, b'0' * size)
579    os.close(fd)
580
581
582def create_file_with_size(path, size):
583    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
584    os.write(fd, b'\0' * size)
585    os.close(fd)
586
587
588def gen_file_set():
589    print("generating empty file ...")
590    gen_file(os.path.join(GP.local_path, "empty"), 0)
591
592    print("generating small file ...")
593    gen_file(os.path.join(GP.local_path, "small"), 102400)
594
595    print("generating medium file ...")
596    gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2)
597
598    print("generating large file ...")
599    gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3)
600
601    print("generating soft link ...")
602    try:
603        os.symlink("small", os.path.join(GP.local_path, "soft_small"))
604    except FileExistsError:
605        print("soft_small already exists")
606
607    print("generating text file ...")
608    gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2)
609
610    print("generating package dir ...")
611    if not os.path.exists(os.path.join(GP.local_path, "package")):
612        os.mkdir(os.path.join(GP.local_path, "package"))
613    for i in range(1, 6):
614        gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2)
615
616    print("generating deep dir ...")
617    deepth = 4
618    deep_path = os.path.join(GP.local_path, "deep_dir")
619    if not os.path.exists(deep_path):
620        os.mkdir(deep_path)
621    for deep in range(deepth):
622        deep_path = os.path.join(deep_path, f"deep_dir{deep}")
623        if not os.path.exists(deep_path):
624            os.mkdir(deep_path)
625    gen_file(os.path.join(deep_path, "deep"), 102400)
626
627    print("generating dir with file ...")
628    dir_path = os.path.join(GP.local_path, "problem_dir")
629    rmdir(dir_path)
630    os.mkdir(dir_path)
631    gen_file(os.path.join(dir_path, "small2"), 102400)
632
633    fuzz_count = 47 # 47 is the count that circulated the file transfer
634    data_unit = 1024 # 1024 is the size that circulated the file transfer
635    data_extra = 936 # 936 is the size that cased the extra file transfer
636    for i in range(fuzz_count):
637        create_file_with_size(
638            os.path.join(
639                dir_path, f"file_{i * data_unit+data_extra}.dat"
640                ), i * data_unit + data_extra
641            )
642
643    print("generating empty dir ...")
644    dir_path = os.path.join(GP.local_path, "empty_dir")
645    rmdir(dir_path)
646    os.mkdir(dir_path)
647
648    print("generating version file ...")
649    gen_file(os.path.join(GP.local_path, GP.get_version()), 0)
650
651
652def prepare_source():
653    if os.path.exists(os.path.join(GP.local_path, GP.get_version())):
654        print(f"hdc test version is {GP.get_version}, check ok, skip prepare.")
655        return
656    print(f"in prepare {GP.local_path},wait for 2 mins.")
657    current_path = os.getcwd()
658
659    if os.path.exists(GP.local_path):
660        # 打开local_path遍历其中的文件,删除hap hsp以外的所有文件
661        for file in os.listdir(GP.local_path):
662            if file.endswith(".hap") or file.endswith(".hsp"):
663                continue
664            file_path = os.path.join(GP.local_path, file)
665            rmdir(file_path)
666    else:
667        os.mkdir(GP.local_path)
668
669    gen_file_set()
670
671
672def gen_package_dir():
673    print("generating app dir ...")
674    dir_path = os.path.join(GP.local_path, "app_dir")
675    rmdir(dir_path)
676    os.mkdir(dir_path)
677    app = os.path.join(GP.local_path, "entry-default-signed-debug.hap")
678    dst_dir = os.path.join(GP.local_path, "app_dir")
679    if not os.path.exists(app):
680        print(f"Source file {app} does not exist.")
681    else:
682        copy_file(app, dst_dir)
683
684
685def setup_tester():
686    while True:
687        GP.print_options()
688        opt = int(select_cmd())
689        if opt == 1:
690            return True
691        elif opt == 2:
692            if not GP.set_options():
693                return False
694            GP.dump()
695        elif opt == 3:
696            prepare_source()
697        elif opt == 4:
698            if not GP.load_testcase():
699                return False
700        elif opt == 5:
701            return False
702        else:
703            return False
704
705
706def load_testcase():
707    if not GP.load_testcase:
708        print("load testcase failed")
709        return False
710    print("load testcase success")
711    return True
712
713
714def check_library_installation(library_name):
715    try:
716        pkg_resources.get_distribution(library_name)
717        return 0
718    except pkg_resources.DistributionNotFound:
719        print(f"\n\n{library_name} is not installed.\n\n")
720        print(f"try to use command below:")
721        print(f"pip install {library_name}")
722        return 1
723
724
725def check_subprocess_cmd(cmd, process_num, timeout):
726
727    for i in range(process_num):
728        p = subprocess.Popen(cmd.split())
729    try:
730        p.wait(timeout=5)
731    except subprocess.TimeoutExpired:
732        p.kill()
733
734
735def check_process_mixed(process_num, timeout, local, remote):
736    multi_num = process_num
737    list_send = []
738    list_recv = []
739    sizes = {"small", "medium", "empty"}
740    for i in range(multi_num):
741        for size in sizes:
742            cmd_send = f"file send {get_local_path(f'{size}')} {get_remote_path(f'it_{size}_mix_{i}')}"
743            cmd_recv = f"file recv {get_remote_path(f'it_{size}_mix_{i}')} {get_local_path(f'recv_{size}_mix_{i}')}"
744            list_send.append(Process(target=check_hdc_cmd, args=(cmd_send, )))
745            list_recv.append(Process(target=check_hdc_cmd, args=(cmd_recv, )))
746            logging.info(f"RESULT:{cmd_send}")  # 打印命令的输出
747    for send in list_send:
748        wait_time = random.uniform(0, 1)
749        send.start()
750        time.sleep(wait_time)
751    for send in list_send:
752        send.join()
753
754    for recv in list_recv:
755        wait_time = random.uniform(0, 1)
756        recv.start()
757        time.sleep(wait_time)
758    for recv in list_recv:
759        recv.join()
760        wait_time = random.uniform(0, 1)
761        time.sleep(wait_time)
762
763
764def execute_lines_in_file(file_path):
765    if not os.path.exists(file_path):
766        flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
767        modes = stat.S_IWUSR | stat.S_IRUSR
768        with os.fdopen(os.open(file_path, flags, modes), 'w') as file:
769            file.write("1,hdc shell ls")
770    with open(file_path, 'r') as file:
771        lines = file.readlines()
772        for line in lines:
773            test_time = line.split(',')[0]
774            test_cmd = line.split(',')[1]
775            pattern = r"^hdc"
776            match = re.search(pattern, test_cmd)
777            if match:
778                result = test_cmd.replace(match.group(0), "").lstrip()
779                test_cmd = f"{GP.hdc_head} {result}"
780
781            for i in range(int(test_time)):
782                logging.info(f"THE {i+1}/{test_time} TEST,COMMAND IS:{test_cmd}")
783                output = subprocess.check_output(test_cmd.split()).decode()
784                logging.info(f"RESULT:{output}")  # 打印命令的输出
785
786
787def make_multiprocess_file(local, remote, mode, num, task_type):
788    if num < 1:
789        return False
790    if task_type == "file":
791        if mode == "send" :
792            file_list = [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)]
793        elif mode == "recv":
794            file_list = [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)]
795        else:
796            return False
797    if task_type == "dir":
798        if mode == "send" :
799            file_list = [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)]
800        elif mode == "recv":
801            file_list = [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)]
802        else:
803            return False
804    print(file_list[0])
805    p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in file_list]
806    print(f"{mode} target {num} start")
807    while(len(p_list)):
808        for p in p_list:
809            if p.poll() is not None:
810                stdout, stderr = p.communicate(timeout=512) # timeout wait 512s
811                if stderr:
812                    print(f"{stderr.decode()}")
813                if stdout:
814                    print(f"{stdout.decode()}")
815                if stdout.decode().find("FileTransfer finish") == -1:
816                    return False
817                p_list.remove(p)
818    res = 1
819    if task_type == "file":
820        for i in range(num):
821            if mode == "send":
822                if _check_file(local, f"{remote}_{i}"):
823                    res *= 1
824                else:
825                    res *= 0
826            elif mode == "recv":
827                if _check_file(f"{local}_{i}", f"{remote}_{i}"):
828                    res *= 1
829                else:
830                    res *= 0
831    if task_type == "dir":
832        for _ in range(num):
833            if mode == "send":
834                end_of_file_name = os.path.basename(local)
835                if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True):
836                    res *= 1
837                else:
838                    res *= 0
839            elif mode == "recv":
840                end_of_file_name = os.path.basename(remote)
841                local = os.path.join(local, end_of_file_name)
842                if check_dir(f"{local}", f"{remote}", is_single_dir=True):
843                    res *= 1
844                else:
845                    res *= 0
846    return res == 1
847
848
849def hdc_get_key(cmd):
850    test_cmd = f"{GP.hdc_head} {cmd}"
851    result = subprocess.check_output(test_cmd.split()).decode()
852    return result
853
854
855def start_subprocess_cmd(cmd, num, assert_out):
856    if num < 1:
857        return False
858    cmd_list = [f"{GP.hdc_head} {cmd}" for _ in range(num)]
859    p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmd_list]
860    logging.info(f"{cmd} target {num} start")
861    while(len(p_list)):
862        for p in p_list:
863            if p.poll() is not None:
864                stdout, stderr = p.communicate(timeout=512)
865                if stderr:
866                    logging.error(f"{stderr.decode()}")
867                if stdout:
868                    logging.info(f"{stdout.decode()}")
869                if assert_out is not None and stdout.decode().find(assert_out) == -1:
870                    return False
871                p_list.remove(p)
872    return True
873
874
875def check_hdc_version(cmd, version):
876
877    def _convert_version_to_hex(_version):
878        parts = _version.split("Ver: ")[1].split('.')
879        hex_version = ''.join(parts)
880        return int(hex_version, 16)
881
882    expected_version = _convert_version_to_hex(version)
883    cmd = f"{GP.hdc_head} -v"
884    print(f"\nexecuting command: {cmd}")
885    if version is not None: # check output valid
886        output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "")
887        real_version = _convert_version_to_hex(output)
888        print(f"--> output: {output}")
889        print(f"--> your local [{version}] is"
890            f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]"
891        )
892        return expected_version <= real_version
893
894
895def check_cmd_time(cmd, pattern, duration, times):
896    if times < 1:
897        print("times should be bigger than 0.")
898        return False
899    if pattern is None:
900        fetchable = True
901    else:
902        fetchable = False
903    start_time = time.time() * 1000
904    print(f"{cmd} start {start_time}")
905    res = []
906    for i in range(times):
907        start_in = time.time() * 1000
908        if pattern is None:
909            subprocess.check_output(f"{GP.hdc_head} {cmd}".split())
910        else:
911            check_shell(cmd, pattern, fetch=fetchable)
912        start_out = time.time() * 1000
913        res.append(start_out - start_in)
914
915    # 计算最大值、最小值和中位数
916    max_value = max(res)
917    min_value = min(res)
918    median_value = sorted(res)[len(res) // 2]
919
920    print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}")
921    print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}")
922    print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}")
923
924    end_time = time.time() * 1000
925
926    try:
927        timecost = int(end_time - start_time) / times
928        print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}")
929    except ZeroDivisionError:
930        print(f"除数为0")
931
932    if duration is None:
933        duration = 150 * 1.2
934    # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status
935    return timecost < duration
936
937
938def check_rom(baseline):
939
940    def _try_get_size(message):
941        try:
942            size = int(message.split('\t')[0])
943        except ValueError:
944            size = -9999 * 1024 # error size
945            print(f"try get size value error, from {message}")
946        return size
947
948    if baseline is None:
949        baseline = 2200
950    # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together
951    cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd"
952    result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode()
953    hdcd_size = _try_get_size(result_hdcd)
954    cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so"
955    result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode()
956    if "directory" in result_libhdc:
957        cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so"
958        result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode()
959        if "directory" in result_libhdc64:
960            libhdc_size = 0
961        else:
962            libhdc_size = _try_get_size(result_libhdc64)
963    else:
964        libhdc_size = _try_get_size(result_libhdc)
965    all_size = hdcd_size + libhdc_size
966    GP.hdcd_rom = all_size
967    if all_size < 0:
968        GP.hdcd_rom = "error"
969        return False
970    else:
971        GP.hdcd_rom = f"{all_size} KB"
972    if all_size > baseline:
973        print(f"rom size is {all_size}, overlimit baseline {baseline}")
974        return False
975    else:
976        print(f"rom size is {all_size}, underlimit baseline {baseline}")
977        return True
978
979
980def run_command_with_timeout(command, timeout):
981    try:
982        result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
983        return result.stdout.decode(), result.stderr.decode()
984    except subprocess.TimeoutExpired:
985        return None, "Command timed out"
986    except subprocess.CalledProcessError as e:
987        return None, e.stderr.decode()
988