• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2025 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# 运行环境: python 3.10+, pytest
16
17import hashlib
18import json
19import os
20import re
21import shutil
22import subprocess
23import sys
24import time
25import functools
26
27import pytest
28import importlib
29
30
31class GP(object):
32    """ Global Parameters
33
34    customize here !!!
35    """
36    hdc_exe = "hdc"
37    local_path = "resource"
38    remote_path = "data/local/tmp"
39    remote_dir_path = "data/local/tmp/it_send_dir"
40    remote_ip = "auto"
41    remote_port = 8710
42    hdc_head = "hdc"
43    device_name = ""
44    targets = []
45    tmode = "usb"
46    changed_testcase = "n"
47    testcase_path = "ts_windows.csv"
48    loaded_testcase = 0
49    hdcd_rom = "not checked"
50    debug_app = "com.example.myapplication"
51
52    @classmethod
53    def init(cls):
54        if os.path.exists(".hdctester.conf"):
55            cls.load()
56            cls.start_host()
57            cls.list_targets()
58        else:
59            cls.set_options()
60            cls.print_options()
61            cls.start_host()
62            cls.dump()
63        return
64
65
66    @classmethod
67    def start_host(cls):
68        cmd = f"{cls.hdc_exe} start"
69        res = subprocess.call(cmd.split())
70        return res
71
72    @classmethod
73    def list_targets(cls):
74        try:
75            targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split()
76        except (OSError, IndexError):
77            targets = [b"failed to auto detect device"]
78            cls.targets = [targets[0].decode()]
79            return False
80        cls.targets = [t.decode() for t in targets]
81        return True
82
83
84    @classmethod
85    def get_device(cls):
86        cls.start_host()
87        cls.list_targets()
88        if len(cls.targets) > 1:
89            print("Multiple device detected, please select one:")
90            for i, t in enumerate(cls.targets):
91                print(f"{i+1}. {t}")
92            print("input the nums of the device above:")
93            cls.device_name = cls.targets[int(input()) - 1]
94        else:
95            cls.device_name = cls.targets[0]
96        if cls.device_name == "failed to auto detect device":
97            print("No device detected, please check your device connection")
98            return False
99        elif cls.device_name == "[empty]":
100            print("No hdc device detected.")
101            return False
102        cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}"
103        return True
104
105
106    @classmethod
107    def dump(cls):
108        try:
109            os.remove(".hdctester.conf")
110        except OSError:
111            pass
112        content = filter(
113            lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items())
114        json_str = json.dumps(dict(content))
115        fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755)
116        os.write(fd, json_str.encode())
117        os.close(fd)
118        return True
119
120
121    @classmethod
122    def load(cls):
123        if not os.path.exists(".hdctester.conf"):
124            raise ConfigFileNotFoundException("No config file found, please run command [python prepare.py]")
125        with open(".hdctester.conf") as f:
126            content = json.load(f)
127            cls.hdc_exe = content.get("hdc_exe")
128            cls.local_path = content.get("local_path")
129            cls.remote_path = content.get("remote_path")
130            cls.remote_ip = content.get("remote_ip")
131            cls.hdc_head = content.get("hdc_head")
132            cls.tmode = content.get("tmode")
133            cls.device_name = content.get("device_name")
134            cls.changed_testcase = content.get("changed_testcase")
135            cls.testcase_path = content.get("testcase_path")
136            cls.loaded_testcase = content.get("load_testcase")
137        return True
138
139
140    @classmethod
141    def print_options(cls):
142        info = "HDC Tester Default Options: \n\n" \
143        + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \
144        + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \
145        + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \
146        + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \
147        + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \
148        + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \
149        + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \
150        + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \
151        + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \
152        + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \
153        + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n"
154        print(info)
155
156
157    @classmethod
158    def tconn_tcp(cls):
159        res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode()
160        if "Connect OK" in res:
161            return True
162        else:
163            return False
164
165
166    @classmethod
167    def set_options(cls):
168        if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip():
169            cls.hdc_exe = opt
170        if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip():
171            cls.local_path = opt
172        if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip():
173            cls.remote_path = opt
174        if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip():
175            cls.remote_ip = opt
176        if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip():
177            cls.remote_port = int(opt)
178        if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip():
179            cls.device_name = opt
180        if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip():
181            cls.tmode = opt
182        if cls.tmode == "usb":
183            ret = cls.get_device()
184            if ret:
185                print("USB device detected.")
186        elif cls.tconn_tcp():
187            cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}"
188        else:
189            print(f"tconn {cls.remote_ip}:{cls.remote_port} failed")
190            return False
191        return True
192
193
194    @classmethod
195    def change_testcase(cls):
196        if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip():
197            cls.changed_testcase = opt
198            if opt == "n":
199                return False
200        if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip():
201            cls.testcase_path = os.path.join(opt)
202        cls.print_options()
203        return True
204
205
206    @classmethod
207    def load_testcase(cls):
208        print("this fuction will coming soon.")
209        return False
210
211    @classmethod
212    def get_version(cls):
213        version = f"v1.0.5a"
214        return version
215
216
217def pytest_run():
218    start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
219    pytest.main()
220    end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
221    report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time()))
222    report_dir = os.path.join(os.getcwd(), "reports")
223    report_file = os.path.join(report_dir, f"{report_time}report.html")
224    print(f"Test over, the script version is {GP.get_version()},"
225        f" start at {start_time}, end at {end_time} \n"
226        f"=======>{report_file} is saved. \n"
227    )
228    input("=======>press [Enter] key to Show logs.")
229
230
231def rmdir(path):
232    try:
233        if sys.platform == "win32":
234            if os.path.isfile(path):
235                os.remove(path)
236            else:
237                shutil.rmtree(path)
238        else:
239            subprocess.call(f"rm -rf {path}".split())
240    except OSError:
241        print(f"Error: {path} : cannot remove")
242        pass
243
244
245def get_local_path(path):
246    return os.path.join(GP.local_path, path)
247
248
249def get_remote_path(path):
250    return f"{GP.remote_path}/{path}"
251
252
253def get_local_md5(local):
254    md5_hash = hashlib.md5()
255    with open(local, "rb") as f:
256        for byte_block in iter(lambda: f.read(4096), b""):
257            md5_hash.update(byte_block)
258    return md5_hash.hexdigest()
259
260
261def check_shell_any_device(cmd, pattern=None, fetch=False):
262    print(f"\nexecuting command: {cmd}")
263    if pattern: # check output valid
264        print("pattern case")
265        output = subprocess.check_output(cmd.split()).decode()
266        res = pattern in output
267        print(f"--> output: {output}")
268        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
269        return res, output
270    elif fetch:
271        output = subprocess.check_output(cmd.split()).decode()
272        print(f"--> output: {output}")
273        return True, output
274    else: # check cmd run successfully
275        print("other case")
276        return subprocess.check_call(cmd.split()) == 0, ""
277
278
279def check_shell(cmd, pattern=None, fetch=False, head=None):
280    if head is None:
281        head = GP.hdc_head
282    cmd = f"{head} {cmd}"
283    print(f"\nexecuting command: {cmd}")
284    if pattern: # check output valid
285        output = subprocess.check_output(cmd.split()).decode()
286        res = pattern in output
287        print(f"--> output: {output}")
288        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
289        return res
290    elif fetch:
291        output = subprocess.check_output(cmd.split()).decode()
292        print(f"--> output: {output}")
293        return output
294    else: # check cmd run successfully
295        return subprocess.check_call(cmd.split()) == 0
296
297
298def get_shell_result(cmd, pattern=None, fetch=False):
299    cmd = f"{GP.hdc_head} {cmd}"
300    print(f"\nexecuting command: {cmd}")
301    return subprocess.check_output(cmd.split()).decode()
302
303
304def check_rate(cmd, expected_rate):
305    send_result = get_shell_result(cmd)
306    rate = float(send_result.split("rate:")[1].split("kB/s")[0])
307    return rate > expected_rate
308
309
310def check_dir(local, remote, is_single_dir=False):
311    def _get_md5sum(remote, is_single_dir=False):
312        if is_single_dir:
313            cmd = f"{GP.hdc_head} shell md5sum {remote}/*"
314        else:
315            cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}}'
316        result = subprocess.check_output(cmd.split()).decode()
317        return result
318
319    def _calculate_md5(file_path):
320        md5 = hashlib.md5()
321        try:
322            with open(file_path, 'rb') as f:
323                for chunk in iter(lambda: f.read(4096), b""):
324                    md5.update(chunk)
325            return md5.hexdigest()
326        except PermissionError:
327            return "PermissionError"
328        except FileNotFoundError:
329            return "FileNotFoundError"
330    print("remote:" + remote)
331    output = _get_md5sum(remote)
332    print(output)
333
334    result = 1
335    for line in output.splitlines():
336        if len(line) < 32: # length of MD5
337            continue
338        expected_md5, file_name = line.split()[:2]
339        if is_single_dir:
340            file_name = file_name.replace(f"{remote}", "")
341        elif GP.remote_path in remote:
342            file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\")
343        else:
344            file_name = file_name.split(remote)[1].replace("/", "\\")
345        file_path = os.path.join(os.getcwd(), GP.local_path) + file_name  # 构建完整的文件路径
346        if is_single_dir:
347            file_path = os.path.join(os.getcwd(), local) + file_name
348        print(file_path)
349        actual_md5 = _calculate_md5(file_path)
350        print(f"Expected: {expected_md5}")
351        print(f"Actual: {actual_md5}")
352        print(f"MD5 matched {file_name}")
353        if actual_md5 != expected_md5:
354            print(f"[Fail]MD5 mismatch for {file_name}")
355            result *= 0
356
357    return (result == 1)
358
359
360def _check_file(local, remote, head=None):
361    if head is None:
362        head = GP.hdc_head
363    if remote.startswith("/proc"):
364        local_size = os.path.getsize(local)
365        if local_size > 0:
366            return True
367        else:
368            return False
369    else:
370        cmd = f"shell md5sum {remote}"
371        local_md5 = get_local_md5(local)
372        return check_shell(cmd, local_md5, head=head)
373
374
375def _check_app_installed(bundle, is_shared=False):
376    dump = "dump-shared" if is_shared else "dump"
377    cmd = f"shell bm {dump} -a"
378    return check_shell(cmd, bundle)
379
380
381def check_hdc_targets():
382    cmd = f"{GP.hdc_head} list targets"
383    print(GP.device_name)
384    return check_shell(cmd, GP.device_name)
385
386
387def check_file_send(local, remote):
388    local_path = os.path.join(GP.local_path, local)
389    remote_path = f"{GP.remote_path}/{remote}"
390    cmd = f"file send {local_path} {remote_path}"
391    return check_shell(cmd) and _check_file(local_path, remote_path)
392
393
394def check_file_recv(remote, local):
395    local_path = os.path.join(GP.local_path, local)
396    remote_path = f"{GP.remote_path}/{remote}"
397    cmd = f"file recv {remote_path} {local_path}"
398    return check_shell(cmd) and _check_file(local_path, remote_path)
399
400
401def check_app_install(app, bundle, args=""):
402    app = os.path.join(GP.local_path, app)
403    install_cmd = f"install {args} {app}"
404    if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")):
405        return check_shell(install_cmd, "failed to install bundle")
406    else:
407        return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args)
408
409
410def check_app_uninstall(bundle, args=""):
411    uninstall_cmd = f"uninstall {args} {bundle}"
412    return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args)
413
414
415def check_app_install_multi(tables, args=""):
416    apps = []
417    bundles = []
418    for app, bundle in tables.items() :
419        app = os.path.join(GP.local_path, app)
420        apps.append(app)
421        bundles.append(bundle)
422
423    apps_str = " ".join(apps)
424    install_cmd = f"install {args} {apps_str}"
425
426    if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str))
427        or (args == "" and 0 == apps_str.count(".hap"))):
428        if not check_shell(install_cmd, "failed to install bundle"):
429            return False
430    else:
431        if not check_shell(install_cmd, "successfully"):
432            return False
433
434        for bundle in bundles:
435            if not _check_app_installed(bundle, "s" in args):
436                return False
437
438    return True
439
440
441def check_app_uninstall_multi(tables, args=""):
442    for app, bundle in tables.items() :
443        if not check_app_uninstall(bundle, args):
444            return False
445
446    return True
447
448
449def check_hdc_cmd(cmd, pattern=None, head=None, is_single_dir=True, **args):
450    if head is None:
451        head = GP.hdc_head
452    if cmd.startswith("file"):
453        if not check_shell(cmd, "FileTransfer finish", head=head):
454            return False
455        if cmd.startswith("file send"):
456            local, remote = cmd.split()[-2:]
457            if remote[-1] == '/' or remote[-1] == '\\':
458                remote = f"{remote}{os.path.basename(local)}"
459        else:
460            remote, local = cmd.split()[-2:]
461            if local[-1] == '/' or local[-1] == '\\':
462                local = f"{local}{os.path.basename(remote)}"
463        if "-b" in cmd:
464            mnt_debug_path = "mnt/debug/100/debug_hap/"
465            remote = f"{mnt_debug_path}/{GP.debug_app}/{remote}"
466        if os.path.isfile(local):
467            return _check_file(local, remote, head=head)
468        else:
469            return check_dir(local, remote, is_single_dir=is_single_dir)
470    elif cmd.startswith("install"):
471        bundle = args.get("bundle", "invalid")
472        opt = " ".join(cmd.split()[1:-1])
473        return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt)
474
475    elif cmd.startswith("uninstall"):
476        bundle = cmd.split()[-1]
477        opt = " ".join(cmd.split()[1:-1])
478        return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt)
479
480    else:
481        return check_shell(cmd, pattern, head=head, **args)
482
483
484def check_soft_local(local_source, local_soft, remote):
485    cmd = f"file send {local_soft} {remote}"
486    if not check_shell(cmd, "FileTransfer finish"):
487        return False
488    return _check_file(local_source, remote)
489
490
491def check_soft_remote(remote_source, remote_soft, local_recv):
492    check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}")
493    cmd = f"file recv {remote_soft} {local_recv}"
494    if not check_shell(cmd, "FileTransfer finish"):
495        return False
496    return _check_file(local_recv, get_remote_path(remote_source))
497
498
499def switch_usb():
500    res = check_hdc_cmd("tmode usb")
501    time.sleep(3)
502    if res:
503        GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}"
504    return res
505
506
507def copy_file(src, dst):
508    try:
509        shutil.copy2(src, dst)
510        print(f"File copied successfully from {src} to {dst}")
511    except IOError as e:
512        print(f"Unable to copy file. {e}")
513    except Exception as e:
514        print(f"Unexpected error: {e}")
515
516
517def switch_tcp():
518    if not GP.remote_ip: # skip tcp check
519        print("!!! remote_ip is none, skip tcp check !!!")
520        return True
521    if GP.remote_ip == "auto":
522        ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True)
523        if not ipconf:
524            print("!!! device ip not found, skip tcp check !!!")
525            return True
526        GP.remote_ip = ipconf.split(":")[1].split()[0]
527        print(f"fetch remote ip: {GP.remote_ip}")
528    ret = check_hdc_cmd(f"tmode port {GP.remote_port}")
529    if ret:
530        time.sleep(3)
531    res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK")
532    if res:
533        GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}"
534    return res
535
536
537def select_cmd():
538    msg = "1) Proceed tester\n" \
539        + "2) Customize tester\n" \
540        + "3) Setup files for transfer\n" \
541        + "4) Load custom testcase(default unused) \n" \
542        + "5) Exit\n" \
543        + ">> "
544
545    while True:
546        opt = input(msg).strip()
547        if len(opt) == 1 and '1' <= opt <= '5':
548            return opt
549
550
551def gen_file(path, size):
552    index = 0
553    path = os.path.abspath(path)
554    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
555
556    while index < size:
557        os.write(fd, os.urandom(1024))
558        index += 1024
559    os.close(fd)
560
561
562def gen_version_file(path):
563    with open(path, "w") as f:
564        f.write(GP.get_version())
565
566
567def gen_zero_file(path, size):
568    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
569    os.write(fd, b'0' * size)
570    os.close(fd)
571
572
573def create_file_with_size(path, size):
574    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
575    os.write(fd, b'\0' * size)
576    os.close(fd)
577
578
579def gen_soft_link():
580    print("generating soft link ...")
581    try:
582        os.symlink("small", os.path.join(GP.local_path, "soft_small"))
583    except FileExistsError:
584        print("soft_small already exists")
585    except OSError:
586        print("生成soft_small失败,需要使用管理员权限用户执行软链接生成")
587
588
589def gen_file_set():
590    print("generating empty file ...")
591    gen_file(os.path.join(GP.local_path, "empty"), 0)
592
593    print("generating small file ...")
594    gen_file(os.path.join(GP.local_path, "small"), 102400)
595
596    print("generating medium file ...")
597    gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2)
598
599    print("generating large file ...")
600    gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3)
601
602    print("generating text file ...")
603    gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2)
604
605    gen_soft_link()
606    print("generating package dir ...")
607    if not os.path.exists(os.path.join(GP.local_path, "package")):
608        os.mkdir(os.path.join(GP.local_path, "package"))
609    for i in range(1, 6):
610        gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2)
611
612    print("generating deep dir ...")
613    deepth = 4
614    deep_path = os.path.join(GP.local_path, "deep_dir")
615    if not os.path.exists(deep_path):
616        os.mkdir(deep_path)
617    for deep in range(deepth):
618        deep_path = os.path.join(deep_path, f"deep_dir{deep}")
619        if not os.path.exists(deep_path):
620            os.mkdir(deep_path)
621    gen_file(os.path.join(deep_path, "deep"), 102400)
622
623    print("generating dir with file ...")
624    dir_path = os.path.join(GP.local_path, "problem_dir")
625    rmdir(dir_path)
626    os.mkdir(dir_path)
627    gen_file(os.path.join(dir_path, "small2"), 102400)
628
629    fuzz_count = 47 # 47 is the count that circulated the file transfer
630    data_unit = 1024 # 1024 is the size that circulated the file transfer
631    data_extra = 936 # 936 is the size that cased the extra file transfer
632    for i in range(fuzz_count):
633        create_file_with_size(
634            os.path.join(dir_path, f"file_{i * data_unit+data_extra}.dat"), i * data_unit + data_extra)
635    print("generating empty dir ...")
636    dir_path = os.path.join(GP.local_path, "empty_dir")
637    rmdir(dir_path)
638    os.mkdir(dir_path)
639    print("generating version file ...")
640    gen_version_file(os.path.join(GP.local_path, "version"))
641
642
643def gen_package_dir():
644    print("generating app dir ...")
645    dir_path = os.path.join(GP.local_path, "app_dir")
646    if os.path.exists(dir_path):
647        rmdir(dir_path)
648    os.mkdir(dir_path)
649    app = os.path.join(GP.local_path, "AACommand07.hap")
650    dst_dir = os.path.join(GP.local_path, "app_dir")
651    if not os.path.exists(app):
652        print(f"Source file {app} does not exist.")
653    else:
654        copy_file(app, dst_dir)
655
656
657def prepare_source():
658    version_file = os.path.join(GP.local_path, "version")
659    if os.path.exists(version_file):
660        with open(version_file, "r") as f:
661            version = f.read()
662            if version == GP.get_version():
663                print(f"hdc test version is {GP.get_version()}, check ok, skip prepare.")
664                return
665    print(f"in prepare {GP.local_path},wait for 2 mins.")
666    current_path = os.getcwd()
667
668    if os.path.exists(GP.local_path):
669        #打开local_path遍历其中的文件,删除hap hsp以外的所有文件
670        for file in os.listdir(GP.local_path):
671            if file.endswith(".hap") or file.endswith(".hsp"):
672                continue
673            file_path = os.path.join(GP.local_path, file)
674            rmdir(file_path)
675    else:
676        os.mkdir(GP.local_path)
677
678    gen_file_set()
679
680
681def add_prepare_source():
682    deep_path = os.path.join(GP.local_path, "deep_test_dir")
683    print("generating deep test dir ...")
684    absolute_path = os.path.abspath(__file__)
685    deepth = (255 - 9 - len(absolute_path)) % 14
686    os.mkdir(deep_path)
687    for deep in range(deepth):
688        deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
689        os.mkdir(deep_path)
690    gen_file(os.path.join(deep_path, "deep_test"), 102400)
691
692    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
693    print("generating recv test dir ...")
694    os.mkdir(recv_dir)
695
696
697def update_source():
698    deep_path = os.path.join(GP.local_path, "deep_test_dir")
699    if not os.path.exists(deep_path):
700        print("generating deep test dir ...")
701        absolute_path = os.path.abspath(__file__)
702        deepth = (255 - 9 - len(absolute_path)) % 14
703        os.mkdir(deep_path)
704        for deep in range(deepth):
705            deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
706            os.mkdir(deep_path)
707        gen_file(os.path.join(deep_path, "deep_test"), 102400)
708
709    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
710    if not os.path.exists(recv_dir):
711        print("generating recv test dir ...")
712        os.mkdir(recv_dir)
713
714
715def load_testcase():
716    if not GP.load_testcase:
717        print("load testcase failed")
718        return False
719    print("load testcase success")
720    return True
721
722
723def check_library_installation(library_name):
724    try:
725        importlib.metadata.version(library_name)
726        return 0
727    except importlib.metadata.PackageNotFoundError:
728        print(f"\n\n{library_name} is not installed.\n\n")
729        print(f"try to use command below:")
730        print(f"pip install {library_name}")
731        return 1
732
733
734def check_subprocess_cmd(cmd, process_num, timeout):
735
736    for i in range(process_num):
737        p = subprocess.Popen(cmd.split())
738    try:
739        p.wait(timeout=5)
740    except subprocess.TimeoutExpired:
741        p.kill()
742
743
744def create_file_commands(local, remote, mode, num):
745    if mode == "send":
746        return [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)]
747    elif mode == "recv":
748        return [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)]
749    else:
750        return []
751
752
753def create_dir_commands(local, remote, mode, num):
754    if mode == "send":
755        return [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)]
756    elif mode == "recv":
757        return [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)]
758    else:
759        return []
760
761
762def execute_commands(commands):
763    processes = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in commands]
764    while processes:
765        for p in processes:
766            if not handle_process(p, processes, assert_out="FileTransfer finish"):
767                return False
768    return True
769
770
771def handle_process(p, processes, assert_out="FileTransfer finish"):
772    if p.poll() is not None:
773        stdout, stderr = p.communicate(timeout=512)  # timeout wait 512s
774        if stderr:
775            print(f"{stderr.decode()}")
776        if stdout:
777            print(f"{stdout.decode()}")
778        if assert_out is not None and stdout.decode().find(assert_out) == -1:
779            return False
780        processes.remove(p)
781    return True
782
783
784def check_files(local, remote, mode, num):
785    res = 1
786    for i in range(num):
787        if mode == "send":
788            if _check_file(local, f"{remote}_{i}"):
789                res *= 1
790            else:
791                res *= 0
792        elif mode == "recv":
793            if _check_file(f"{local}_{i}", f"{remote}_{i}"):
794                res *= 1
795            else:
796                res *= 0
797    return res == 1
798
799
800def check_dirs(local, remote, mode, num):
801    res = 1
802    for _ in range(num):
803        if mode == "send":
804            end_of_file_name = os.path.basename(local)
805            if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True):
806                res *= 1
807            else:
808                res *= 0
809        elif mode == "recv":
810            end_of_file_name = os.path.basename(remote)
811            local = os.path.join(local, end_of_file_name)
812            if check_dir(f"{local}", f"{remote}", is_single_dir=True):
813                res *= 1
814            else:
815                res *= 0
816    return res == 1
817
818
819def make_multiprocess_file(local, remote, mode, num, task_type):
820    if num < 1:
821        return False
822
823    if task_type == "file":
824        commands = create_file_commands(local, remote, mode, num)
825    elif task_type == "dir":
826        commands = create_dir_commands(local, remote, mode, num)
827    else:
828        return False
829
830    print(commands[0])
831    if not execute_commands(commands):
832        return False
833
834    if task_type == "file":
835        return check_files(local, remote, mode, num)
836    elif task_type == "dir":
837        return check_dirs(local, remote, mode, num)
838    else:
839        return False
840
841
842def hdc_get_key(cmd):
843    test_cmd = f"{GP.hdc_head} {cmd}"
844    result = subprocess.check_output(test_cmd.split()).decode()
845    return result
846
847
848def check_hdc_version(cmd, version):
849
850    def _convert_version_to_hex(_version):
851        parts = _version.split("Ver: ")[1].split('.')
852        hex_version = ''.join(parts)
853        return int(hex_version, 16)
854
855    expected_version = _convert_version_to_hex(version)
856    cmd = f"{GP.hdc_head} -v"
857    print(f"\nexecuting command: {cmd}")
858    if version is not None: # check output valid
859        output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "")
860        real_version = _convert_version_to_hex(output)
861        print(f"--> output: {output}")
862        print(f"--> your local [{version}] is"
863            f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]"
864        )
865        return expected_version <= real_version
866
867
868def check_cmd_time(cmd, pattern, duration, times):
869    if times < 1:
870        print("times should be bigger than 0.")
871        return False
872    if pattern is None:
873        fetchable = True
874    else:
875        fetchable = False
876    start_time = time.time() * 1000
877    print(f"{cmd} start {start_time}")
878    res = []
879    for i in range(times):
880        start_in = time.time() * 1000
881        if pattern is None:
882            subprocess.check_output(f"{GP.hdc_head} {cmd}".split())
883        elif not check_shell(cmd, pattern, fetch=fetchable):
884            return False
885        start_out = time.time() * 1000
886        res.append(start_out - start_in)
887
888    # 计算最大值、最小值和中位数
889    max_value = max(res)
890    min_value = min(res)
891    median_value = sorted(res)[len(res) // 2]
892
893    print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}")
894    print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}")
895    print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}")
896
897    end_time = time.time() * 1000
898
899    try:
900        timecost = int(end_time - start_time) / times
901        print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}")
902    except ZeroDivisionError:
903        print(f"除数为0")
904
905    if duration is None:
906        duration = 150 * 1.2
907    # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status
908    return timecost < duration
909
910
911def check_rom(baseline):
912
913    def _try_get_size(message):
914        try:
915            size = int(message.split('\t')[0])
916        except ValueError:
917            size = -9999 * 1024 # error size
918            print(f"try get size value error, from {message}")
919        return size
920
921    if baseline is None:
922        baseline = 2200
923    # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together
924    cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd"
925    result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode()
926    hdcd_size = _try_get_size(result_hdcd)
927    cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so"
928    result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode()
929    if "directory" in result_libhdc:
930        cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so"
931        result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode()
932        if "directory" in result_libhdc64:
933            libhdc_size = 0
934        else:
935            libhdc_size = _try_get_size(result_libhdc64)
936    else:
937        libhdc_size = _try_get_size(result_libhdc)
938    all_size = hdcd_size + libhdc_size
939    GP.hdcd_rom = all_size
940    if all_size < 0:
941        GP.hdcd_rom = "error"
942        return False
943    else:
944        GP.hdcd_rom = f"{all_size} KB"
945    if all_size > baseline:
946        print(f"rom size is {all_size}, overlimit baseline {baseline}")
947        return False
948    else:
949        print(f"rom size is {all_size}, underlimit baseline {baseline}")
950        return True
951
952
953def run_command_with_timeout(command, timeout):
954    try:
955        result = subprocess.run(command.split(), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
956        return result.stdout.decode(), result.stderr.decode()
957    except subprocess.TimeoutExpired:
958        return "", "Command timed out"
959    except subprocess.CalledProcessError as e:
960        return "", e.stderr.decode()
961
962
963def check_cmd_block(command, pattern, timeout=600):
964    # 启动子进程
965    process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
966
967    # 用于存储子进程的输出
968    output = ""
969
970    try:
971        # 读取子进程的输出
972        output, _ = process.communicate(timeout=timeout)
973    except subprocess.TimeoutExpired:
974        process.terminate()
975        process.kill()
976        output, _ = process.communicate(timeout=timeout)
977
978    print(f"--> output: {output}")
979    if pattern in output:
980        return True
981    else:
982        return False
983
984
985def check_version(version):
986    def decorator(func):
987        @functools.wraps(func)
988        def wrapper(*args, **kwargs):
989            if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version):
990                print("version does not match, ignore this case")
991                pytest.skip("Version does not match, test skipped.")
992            return func(*args, **kwargs)
993        return wrapper
994    return decorator
995
996
997@pytest.fixture(scope='class', autouse=True)
998def load_gp(request):
999    GP.load()
1000
1001
1002class ConfigFileNotFoundException(Exception):
1003    """配置文件未找到异常"""
1004    pass