• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2025 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# 运行环境: python 3.10+, pytest
16
17import hashlib
18import json
19import os
20import re
21import shutil
22import subprocess
23import sys
24import time
25import tempfile
26import functools
27import logging
28import socket
29import platform
30import pytest
31import importlib
32
33logger = logging.getLogger(__name__)
34
35
36class GP(object):
37    """ Global Parameters
38
39    customize here !!!
40    """
41    hdc_exe = "hdc"
42    local_path = "resource"
43    remote_path = "data/local/tmp"
44    remote_dir_path = "data/local/tmp/it_send_dir"
45    remote_ip = "auto"
46    remote_port = 8710
47    hdc_head = "hdc"
48    device_name = ""
49    targets = []
50    tmode = "usb"
51    changed_testcase = "n"
52    testcase_path = "ts_windows.csv"
53    loaded_testcase = 0
54    hdcd_rom = "not checked"
55    debug_app = "com.example.myapplication"
56
57    @classmethod
58    def init(cls):
59        if os.path.exists(".hdctester.conf"):
60            cls.load()
61            cls.start_host()
62            cls.list_targets()
63        else:
64            cls.set_options()
65            cls.print_options()
66            cls.start_host()
67            cls.dump()
68        return
69
70
71    @classmethod
72    def start_host(cls):
73        cmd = f"{cls.hdc_exe} start"
74        res = subprocess.call(cmd.split())
75        return res
76
77    @classmethod
78    def list_targets(cls):
79        try:
80            targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split()
81        except (OSError, IndexError):
82            targets = [b"failed to auto detect device"]
83            cls.targets = [targets[0].decode()]
84            return False
85        cls.targets = [t.decode() for t in targets]
86        return True
87
88
89    @classmethod
90    def get_device(cls):
91        cls.start_host()
92        cls.list_targets()
93        if len(cls.targets) > 1:
94            print("Multiple device detected, please select one:")
95            for i, t in enumerate(cls.targets):
96                print(f"{i+1}. {t}")
97            print("input the nums of the device above:")
98            cls.device_name = cls.targets[int(input()) - 1]
99        else:
100            cls.device_name = cls.targets[0]
101        if cls.device_name == "failed to auto detect device":
102            print("No device detected, please check your device connection")
103            return False
104        elif cls.device_name == "[empty]":
105            print("No hdc device detected.")
106            return False
107        cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}"
108        return True
109
110
111    @classmethod
112    def dump(cls):
113        try:
114            os.remove(".hdctester.conf")
115        except OSError:
116            pass
117        content = filter(
118            lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items())
119        json_str = json.dumps(dict(content))
120        fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755)
121        os.write(fd, json_str.encode())
122        os.close(fd)
123        return True
124
125
126    @classmethod
127    def load(cls):
128        if not os.path.exists(".hdctester.conf"):
129            raise ConfigFileNotFoundException("No config file found, please run command [python prepare.py]")
130        with open(".hdctester.conf") as f:
131            content = json.load(f)
132            cls.hdc_exe = content.get("hdc_exe")
133            cls.local_path = content.get("local_path")
134            cls.remote_path = content.get("remote_path")
135            cls.remote_ip = content.get("remote_ip")
136            cls.hdc_head = content.get("hdc_head")
137            cls.tmode = content.get("tmode")
138            cls.device_name = content.get("device_name")
139            cls.changed_testcase = content.get("changed_testcase")
140            cls.testcase_path = content.get("testcase_path")
141            cls.loaded_testcase = content.get("load_testcase")
142        return True
143
144
145    @classmethod
146    def print_options(cls):
147        info = "HDC Tester Default Options: \n\n" \
148        + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \
149        + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \
150        + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \
151        + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \
152        + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \
153        + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \
154        + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \
155        + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \
156        + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \
157        + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \
158        + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n"
159        print(info)
160
161
162    @classmethod
163    def tconn_tcp(cls):
164        res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode()
165        if "Connect OK" in res:
166            return True
167        else:
168            return False
169
170
171    @classmethod
172    def set_options(cls):
173        if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip():
174            cls.hdc_exe = opt
175        if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip():
176            cls.local_path = opt
177        if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip():
178            cls.remote_path = opt
179        if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip():
180            cls.remote_ip = opt
181        if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip():
182            cls.remote_port = int(opt)
183        if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip():
184            cls.device_name = opt
185        if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip():
186            cls.tmode = opt
187        if cls.tmode == "usb":
188            ret = cls.get_device()
189            if ret:
190                print("USB device detected.")
191        elif cls.tconn_tcp():
192            cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}"
193        else:
194            print(f"tconn {cls.remote_ip}:{cls.remote_port} failed")
195            return False
196        return True
197
198
199    @classmethod
200    def change_testcase(cls):
201        if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip():
202            cls.changed_testcase = opt
203            if opt == "n":
204                return False
205        if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip():
206            cls.testcase_path = os.path.join(opt)
207        cls.print_options()
208        return True
209
210
211    @classmethod
212    def load_testcase(cls):
213        print("this fuction will coming soon.")
214        return False
215
216    @classmethod
217    def get_version(cls):
218        version = f"v1.0.6a"
219        return version
220
221
222def pytest_run():
223    start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
224    pytest.main()
225    end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
226    report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time()))
227    report_dir = os.path.join(os.getcwd(), "reports")
228    report_file = os.path.join(report_dir, f"{report_time}report.html")
229    print(f"Test over, the script version is {GP.get_version()},"
230        f" start at {start_time}, end at {end_time} \n"
231        f"=======>{report_file} is saved. \n"
232    )
233    input("=======>press [Enter] key to Show logs.")
234
235
236def rmdir(path):
237    try:
238        if sys.platform == "win32":
239            if os.path.isfile(path) or os.path.islink(path):
240                os.remove(path)
241            else:
242                shutil.rmtree(path)
243        else:
244            subprocess.call(f"rm -rf {path}".split())
245    except OSError:
246        print(f"Error: {path} : cannot remove")
247        pass
248
249
250def get_local_path(path):
251    return os.path.join(GP.local_path, path)
252
253
254def get_remote_path(path):
255    return f"{GP.remote_path}/{path}"
256
257
258def get_local_md5(local):
259    md5_hash = hashlib.md5()
260    with open(local, "rb") as f:
261        for byte_block in iter(lambda: f.read(4096), b""):
262            md5_hash.update(byte_block)
263    return md5_hash.hexdigest()
264
265
266def check_shell_any_device(cmd, pattern=None, fetch=False):
267    print(f"\nexecuting command: {cmd}")
268    if pattern: # check output valid
269        print("pattern case")
270        try:
271            output = subprocess.check_output(cmd.split()).decode('utf-8')
272        except UnicodeDecodeError:
273            output = subprocess.check_output(cmd.split()).decode('gbk')
274        res = pattern in output
275        print(f"--> output: {output}")
276        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
277        return res, output
278    elif fetch:
279        output = subprocess.check_output(cmd.split()).decode()
280        print(f"--> output: {output}")
281        return True, output
282    else: # check cmd run successfully
283        print("other case")
284        return subprocess.check_call(cmd.split()) == 0, ""
285
286
287def check_shell(cmd, pattern=None, fetch=False, head=None):
288    if head is None:
289        head = GP.hdc_head
290    cmd = f"{head} {cmd}"
291    print(f"\nexecuting command: {cmd}")
292    if pattern: # check output valid
293        output = subprocess.check_output(cmd.split()).decode()
294        res = pattern in output
295        print(f"--> output: {output}")
296        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
297        return res
298    elif fetch:
299        output = subprocess.check_output(cmd.split()).decode()
300        print(f"--> output: {output}")
301        return output
302    else: # check cmd run successfully
303        return subprocess.check_call(cmd.split()) == 0
304
305
306def get_shell_result(cmd, pattern=None, fetch=False):
307    cmd = f"{GP.hdc_head} {cmd}"
308    print(f"\nexecuting command: {cmd}")
309    return subprocess.check_output(cmd.split()).decode()
310
311
312def check_rate(cmd, expected_rate):
313    send_result = get_shell_result(cmd)
314    rate = float(send_result.split("rate:")[1].split("kB/s")[0])
315    return rate > expected_rate
316
317
318def check_dir(local, remote, is_single_dir=False):
319    def _get_md5sum(remote, is_single_dir=False):
320        if is_single_dir:
321            cmd = f"{GP.hdc_head} shell md5sum {remote}/*"
322        else:
323            cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}}'
324        result = subprocess.check_output(cmd.split()).decode()
325        return result
326
327    def _calculate_md5(file_path):
328        md5 = hashlib.md5()
329        try:
330            with open(file_path, 'rb') as f:
331                for chunk in iter(lambda: f.read(4096), b""):
332                    md5.update(chunk)
333            return md5.hexdigest()
334        except PermissionError:
335            return "PermissionError"
336        except FileNotFoundError:
337            return "FileNotFoundError"
338    print("remote:" + remote)
339    output = _get_md5sum(remote)
340    print(output)
341
342    result = 1
343    for line in output.splitlines():
344        if len(line) < 32: # length of MD5
345            continue
346        expected_md5, file_name = line.split()[:2]
347        if is_single_dir:
348            file_name = file_name.replace(f"{remote}", "")
349        elif GP.remote_path in remote:
350            file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\")
351        else:
352            file_name = file_name.split(remote)[1].replace("/", "\\")
353        file_path = os.path.join(os.getcwd(), GP.local_path) + file_name  # 构建完整的文件路径
354        if is_single_dir:
355            file_path = os.path.join(os.getcwd(), local) + file_name
356        print(file_path)
357        actual_md5 = _calculate_md5(file_path)
358        print(f"Expected: {expected_md5}")
359        print(f"Actual: {actual_md5}")
360        print(f"MD5 matched {file_name}")
361        if actual_md5 != expected_md5:
362            print(f"[Fail]MD5 mismatch for {file_name}")
363            result *= 0
364
365    return (result == 1)
366
367
368def _check_file(local, remote, head=None):
369    if head is None:
370        head = GP.hdc_head
371    if remote.startswith("/proc"):
372        local_size = os.path.getsize(local)
373        if local_size > 0:
374            return True
375        else:
376            return False
377    else:
378        cmd = f"shell md5sum {remote}"
379        local_md5 = get_local_md5(local)
380        return check_shell(cmd, local_md5, head=head)
381
382
383def _check_app_installed(bundle, is_shared=False):
384    dump = "dump-shared" if is_shared else "dump"
385    cmd = f"shell bm {dump} -a"
386    return check_shell(cmd, bundle)
387
388
389def check_hdc_targets():
390    cmd = f"{GP.hdc_head} list targets"
391    print(GP.device_name)
392    return check_shell(cmd, GP.device_name)
393
394
395def check_file_send(local, remote):
396    local_path = os.path.join(GP.local_path, local)
397    remote_path = f"{GP.remote_path}/{remote}"
398    cmd = f"file send {local_path} {remote_path}"
399    return check_shell(cmd) and _check_file(local_path, remote_path)
400
401
402def check_file_recv(remote, local):
403    local_path = os.path.join(GP.local_path, local)
404    remote_path = f"{GP.remote_path}/{remote}"
405    cmd = f"file recv {remote_path} {local_path}"
406    return check_shell(cmd) and _check_file(local_path, remote_path)
407
408
409def check_app_install(app, bundle, args=""):
410    app = os.path.join(GP.local_path, app)
411    install_cmd = f"install {args} {app}"
412    if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")):
413        return check_shell(install_cmd, "failed to install bundle")
414    else:
415        return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args)
416
417
418def check_app_uninstall(bundle, args=""):
419    uninstall_cmd = f"uninstall {args} {bundle}"
420    return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args)
421
422
423def check_app_install_multi(tables, args=""):
424    apps = []
425    bundles = []
426    for app, bundle in tables.items() :
427        app = os.path.join(GP.local_path, app)
428        apps.append(app)
429        bundles.append(bundle)
430
431    apps_str = " ".join(apps)
432    install_cmd = f"install {args} {apps_str}"
433
434    if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str))
435        or (args == "" and 0 == apps_str.count(".hap"))):
436        if not check_shell(install_cmd, "failed to install bundle"):
437            return False
438    else:
439        if not check_shell(install_cmd, "successfully"):
440            return False
441
442        for bundle in bundles:
443            if not _check_app_installed(bundle, "s" in args):
444                return False
445
446    return True
447
448
449def check_app_uninstall_multi(tables, args=""):
450    for app, bundle in tables.items() :
451        if not check_app_uninstall(bundle, args):
452            return False
453
454    return True
455
456
457def check_app_not_exist(app, bundle, args=""):
458    app = os.path.join(GP.local_path, app)
459    install_cmd = f"install {args} {app}"
460    if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")):
461        return check_shell(install_cmd, "Error opening file")
462    return False
463
464
465def check_app_dir_not_exist(app, bundle, args=""):
466    app = os.path.join(GP.local_path, app)
467    install_cmd = f"install {args} {app}"
468    return check_shell(install_cmd, "Not any installation package was found")
469
470
471def check_hdc_cmd(cmd, pattern=None, head=None, is_single_dir=True, **args):
472    if head is None:
473        head = GP.hdc_head
474    if cmd.startswith("file"):
475        if not check_shell(cmd, "FileTransfer finish", head=head):
476            return False
477        if cmd.startswith("file send"):
478            local, remote = cmd.split()[-2:]
479            if remote[-1] == '/' or remote[-1] == '\\':
480                remote = f"{remote}{os.path.basename(local)}"
481        else:
482            remote, local = cmd.split()[-2:]
483            if local[-1] == '/' or local[-1] == '\\':
484                local = f"{local}{os.path.basename(remote)}"
485        if "-b" in cmd:
486            mnt_debug_path = "mnt/debug/100/debug_hap/"
487            remote = f"{mnt_debug_path}/{GP.debug_app}/{remote}"
488        if os.path.isfile(local):
489            return _check_file(local, remote, head=head)
490        else:
491            return check_dir(local, remote, is_single_dir=is_single_dir)
492    elif cmd.startswith("install"):
493        bundle = args.get("bundle", "invalid")
494        opt = " ".join(cmd.split()[1:-1])
495        return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt)
496
497    elif cmd.startswith("uninstall"):
498        bundle = cmd.split()[-1]
499        opt = " ".join(cmd.split()[1:-1])
500        return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt)
501
502    else:
503        return check_shell(cmd, pattern, head=head, **args)
504
505
506def check_soft_local(local_source, local_soft, remote):
507    cmd = f"file send {local_soft} {remote}"
508    if not check_shell(cmd, "FileTransfer finish"):
509        return False
510    return _check_file(local_source, remote)
511
512
513def check_soft_remote(remote_source, remote_soft, local_recv):
514    check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}")
515    cmd = f"file recv {remote_soft} {local_recv}"
516    if not check_shell(cmd, "FileTransfer finish"):
517        return False
518    return _check_file(local_recv, get_remote_path(remote_source))
519
520
521def switch_usb():
522    res = check_hdc_cmd("tmode usb")
523    time.sleep(3)
524    if res:
525        GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}"
526    return res
527
528
529def copy_file(src, dst):
530    try:
531        shutil.copy2(src, dst)
532        print(f"File copied successfully from {src} to {dst}")
533    except IOError as e:
534        print(f"Unable to copy file. {e}")
535    except Exception as e:
536        print(f"Unexpected error: {e}")
537
538
539def switch_tcp():
540    if not GP.remote_ip: # skip tcp check
541        print("!!! remote_ip is none, skip tcp check !!!")
542        return True
543    if GP.remote_ip == "auto":
544        ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True)
545        if not ipconf:
546            print("!!! device ip not found, skip tcp check !!!")
547            return True
548        GP.remote_ip = ipconf.split(":")[1].split()[0]
549        print(f"fetch remote ip: {GP.remote_ip}")
550    ret = check_hdc_cmd(f"tmode port {GP.remote_port}")
551    if ret:
552        time.sleep(3)
553    res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK")
554    if res:
555        GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}"
556    return res
557
558
559def select_cmd():
560    msg = "1) Proceed tester\n" \
561        + "2) Customize tester\n" \
562        + "3) Setup files for transfer\n" \
563        + "4) Load custom testcase(default unused) \n" \
564        + "5) Exit\n" \
565        + ">> "
566
567    while True:
568        opt = input(msg).strip()
569        if len(opt) == 1 and '1' <= opt <= '5':
570            return opt
571
572
573def gen_file(path, size):
574    index = 0
575    path = os.path.abspath(path)
576    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
577
578    while index < size:
579        os.write(fd, os.urandom(1024))
580        index += 1024
581    os.close(fd)
582
583
584def gen_version_file(path):
585    with open(path, "w") as f:
586        f.write(GP.get_version())
587
588
589def gen_zero_file(path, size):
590    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
591    os.write(fd, b'0' * size)
592    os.close(fd)
593
594
595def create_file_with_size(path, size):
596    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
597    os.write(fd, b'\0' * size)
598    os.close(fd)
599
600
601def gen_soft_link():
602    print("generating soft link ...")
603    depth_1_path = os.path.join(GP.local_path, "d1")
604    depth_2_path = os.path.join(GP.local_path, "d1", "d2")
605    if not os.path.exists(os.path.join(GP.local_path, "d1")):
606        os.mkdir(depth_1_path)
607        os.mkdir(depth_2_path)
608        copy_file(os.path.join(GP.local_path, "small"), depth_2_path)
609    try:
610        os.symlink("small", os.path.join(GP.local_path, "soft_small"))
611    except FileExistsError:
612        print("soft_small already exists")
613    except OSError:
614        print("生成soft_small失败,需要使用管理员权限用户执行软链接生成")
615    try:
616        os.symlink("d1", os.path.join(GP.local_path, "soft_dir"))
617    except FileExistsError:
618        print("soft_dir already exists")
619    except OSError:
620        print("生成soft_dir失败,需要使用管理员权限用户执行软链接生成")
621
622
623def gen_file_set():
624    print("generating empty file ...")
625    gen_file(os.path.join(GP.local_path, "empty"), 0)
626
627    print("generating small file ...")
628    gen_file(os.path.join(GP.local_path, "small"), 102400)
629
630    print("generating medium file ...")
631    gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2)
632
633    print("generating large file ...")
634    gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3)
635
636    print("generating text file ...")
637    gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2)
638
639    gen_soft_link()
640    print("generating package dir ...")
641    if not os.path.exists(os.path.join(GP.local_path, "package")):
642        os.mkdir(os.path.join(GP.local_path, "package"))
643    for i in range(1, 6):
644        gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2)
645
646    print("generating deep dir ...")
647    deepth = 4
648    deep_path = os.path.join(GP.local_path, "deep_dir")
649    if not os.path.exists(deep_path):
650        os.mkdir(deep_path)
651    for deep in range(deepth):
652        deep_path = os.path.join(deep_path, f"deep_dir{deep}")
653        if not os.path.exists(deep_path):
654            os.mkdir(deep_path)
655    gen_file(os.path.join(deep_path, "deep"), 102400)
656
657    print("generating dir with file ...")
658    dir_path = os.path.join(GP.local_path, "problem_dir")
659    rmdir(dir_path)
660    os.mkdir(dir_path)
661    gen_file(os.path.join(dir_path, "small2"), 102400)
662
663    fuzz_count = 47 # 47 is the count that circulated the file transfer
664    data_unit = 1024 # 1024 is the size that circulated the file transfer
665    data_extra = 936 # 936 is the size that cased the extra file transfer
666    for i in range(fuzz_count):
667        create_file_with_size(
668            os.path.join(dir_path, f"file_{i * data_unit+data_extra}.dat"), i * data_unit + data_extra)
669    print("generating empty dir ...")
670    dir_path = os.path.join(GP.local_path, "empty_dir")
671    rmdir(dir_path)
672    os.mkdir(dir_path)
673    print("generating version file ...")
674    gen_version_file(os.path.join(GP.local_path, "version"))
675
676
677def gen_package_dir():
678    print("generating app dir ...")
679    dir_path = os.path.join(GP.local_path, "app_dir")
680    if os.path.exists(dir_path):
681        rmdir(dir_path)
682    os.mkdir(dir_path)
683    app = os.path.join(GP.local_path, "AACommand07.hap")
684    dst_dir = os.path.join(GP.local_path, "app_dir")
685    if not os.path.exists(app):
686        print(f"Source file {app} does not exist.")
687    else:
688        copy_file(app, dst_dir)
689
690
691def prepare_source():
692    version_file = os.path.join(GP.local_path, "version")
693    if os.path.exists(version_file):
694        with open(version_file, "r") as f:
695            version = f.read()
696            if version == GP.get_version():
697                print(f"hdc test version is {GP.get_version()}, check ok, skip prepare.")
698                return
699    print(f"in prepare {GP.local_path},wait for 2 mins.")
700    current_path = os.getcwd()
701
702    if os.path.exists(GP.local_path):
703        #打开local_path遍历其中的文件,删除hap hsp以外的所有文件
704        for file in os.listdir(GP.local_path):
705            if file.endswith(".hap") or file.endswith(".hsp"):
706                continue
707            file_path = os.path.join(GP.local_path, file)
708            rmdir(file_path)
709    else:
710        os.mkdir(GP.local_path)
711
712    gen_file_set()
713
714
715def add_prepare_source():
716    deep_path = os.path.join(GP.local_path, "deep_test_dir")
717    print("generating deep test dir ...")
718    absolute_path = os.path.abspath(__file__)
719    deepth = (255 - 9 - len(absolute_path)) % 14
720    os.mkdir(deep_path)
721    for deep in range(deepth):
722        deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
723        os.mkdir(deep_path)
724    gen_file(os.path.join(deep_path, "deep_test"), 102400)
725
726    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
727    print("generating recv test dir ...")
728    os.mkdir(recv_dir)
729
730
731def update_source():
732    deep_path = os.path.join(GP.local_path, "deep_test_dir")
733    if not os.path.exists(deep_path):
734        print("generating deep test dir ...")
735        absolute_path = os.path.abspath(__file__)
736        deepth = (255 - 9 - len(absolute_path)) % 14
737        os.mkdir(deep_path)
738        for deep in range(deepth):
739            deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
740            os.mkdir(deep_path)
741        gen_file(os.path.join(deep_path, "deep_test"), 102400)
742
743    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
744    if not os.path.exists(recv_dir):
745        print("generating recv test dir ...")
746        os.mkdir(recv_dir)
747
748
749def load_testcase():
750    if not GP.load_testcase:
751        print("load testcase failed")
752        return False
753    print("load testcase success")
754    return True
755
756
757def check_library_installation(library_name):
758    try:
759        importlib.metadata.version(library_name)
760        return 0
761    except importlib.metadata.PackageNotFoundError:
762        print(f"\n\n{library_name} is not installed.\n\n")
763        print(f"try to use command below:")
764        print(f"pip install {library_name}")
765        return 1
766
767
768def check_subprocess_cmd(cmd, process_num, timeout):
769
770    for i in range(process_num):
771        p = subprocess.Popen(cmd.split())
772    try:
773        p.wait(timeout=5)
774    except subprocess.TimeoutExpired:
775        p.kill()
776
777
778def create_file_commands(local, remote, mode, num):
779    if mode == "send":
780        return [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)]
781    elif mode == "recv":
782        return [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)]
783    else:
784        return []
785
786
787def create_dir_commands(local, remote, mode, num):
788    if mode == "send":
789        return [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)]
790    elif mode == "recv":
791        return [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)]
792    else:
793        return []
794
795
796def execute_commands(commands):
797    processes = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in commands]
798    while processes:
799        for p in processes:
800            if not handle_process(p, processes, assert_out="FileTransfer finish"):
801                return False
802    return True
803
804
805def handle_process(p, processes, assert_out="FileTransfer finish"):
806    if p.poll() is not None:
807        stdout, stderr = p.communicate(timeout=512)  # timeout wait 512s
808        if stderr:
809            print(f"{stderr.decode()}")
810        if stdout:
811            print(f"{stdout.decode()}")
812        if assert_out is not None and stdout.decode().find(assert_out) == -1:
813            return False
814        processes.remove(p)
815    return True
816
817
818def check_files(local, remote, mode, num):
819    res = 1
820    for i in range(num):
821        if mode == "send":
822            if _check_file(local, f"{remote}_{i}"):
823                res *= 1
824            else:
825                res *= 0
826        elif mode == "recv":
827            if _check_file(f"{local}_{i}", f"{remote}_{i}"):
828                res *= 1
829            else:
830                res *= 0
831    return res == 1
832
833
834def check_dirs(local, remote, mode, num):
835    res = 1
836    for _ in range(num):
837        if mode == "send":
838            end_of_file_name = os.path.basename(local)
839            if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True):
840                res *= 1
841            else:
842                res *= 0
843        elif mode == "recv":
844            end_of_file_name = os.path.basename(remote)
845            local = os.path.join(local, end_of_file_name)
846            if check_dir(f"{local}", f"{remote}", is_single_dir=True):
847                res *= 1
848            else:
849                res *= 0
850    return res == 1
851
852
853def make_multiprocess_file(local, remote, mode, num, task_type):
854    if num < 1:
855        return False
856
857    if task_type == "file":
858        commands = create_file_commands(local, remote, mode, num)
859    elif task_type == "dir":
860        commands = create_dir_commands(local, remote, mode, num)
861    else:
862        return False
863
864    print(commands[0])
865    if not execute_commands(commands):
866        return False
867
868    if task_type == "file":
869        return check_files(local, remote, mode, num)
870    elif task_type == "dir":
871        return check_dirs(local, remote, mode, num)
872    else:
873        return False
874
875
876def hdc_get_key(cmd):
877    test_cmd = f"{GP.hdc_head} {cmd}"
878    result = subprocess.check_output(test_cmd.split()).decode()
879    return result
880
881
882def check_hdc_version(cmd, version):
883
884    def _convert_version_to_hex(_version):
885        parts = _version.split("Ver: ")[1].split('.')
886        hex_version = ''.join(parts)
887        return int(hex_version, 36)
888
889    expected_version = _convert_version_to_hex(version)
890    cmd = f"{GP.hdc_head} {cmd}"
891    print(f"\nexecuting command: {cmd}")
892    if version is not None: # check output valid
893        output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "")
894        real_version = _convert_version_to_hex(output)
895        print(f"--> output: {output}")
896        print(f"--> your local [{version}] is"
897            f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]"
898        )
899        return expected_version <= real_version
900
901
902def check_cmd_time(cmd, pattern, duration, times):
903    if times < 1:
904        print("times should be bigger than 0.")
905        return False
906    if pattern is None:
907        fetchable = True
908    else:
909        fetchable = False
910    start_time = time.time() * 1000
911    print(f"{cmd} start {start_time}")
912    res = []
913    for i in range(times):
914        start_in = time.time() * 1000
915        if pattern is None:
916            subprocess.check_output(f"{GP.hdc_head} {cmd}".split())
917        elif not check_shell(cmd, pattern, fetch=fetchable):
918            return False
919        start_out = time.time() * 1000
920        res.append(start_out - start_in)
921
922    # 计算最大值、最小值和中位数
923    max_value = max(res)
924    min_value = min(res)
925    median_value = sorted(res)[len(res) // 2]
926
927    print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}")
928    print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}")
929    print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}")
930
931    end_time = time.time() * 1000
932
933    try:
934        timecost = int(end_time - start_time) / times
935        print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}")
936    except ZeroDivisionError:
937        print(f"除数为0")
938
939    if duration is None:
940        duration = 150 * 1.2
941    # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status
942    return timecost < duration
943
944
945def check_rom(baseline):
946
947    def _try_get_size(message):
948        try:
949            size = int(message.split('\t')[0])
950        except ValueError:
951            size = -9999 * 1024 # error size
952            print(f"try get size value error, from {message}")
953        return size
954
955    if baseline is None:
956        baseline = 2200
957    # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together
958    cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd"
959    result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode()
960    hdcd_size = _try_get_size(result_hdcd)
961    cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so"
962    result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode()
963    if "directory" in result_libhdc:
964        cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so"
965        result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode()
966        if "directory" in result_libhdc64:
967            libhdc_size = 0
968        else:
969            libhdc_size = _try_get_size(result_libhdc64)
970    else:
971        libhdc_size = _try_get_size(result_libhdc)
972    all_size = hdcd_size + libhdc_size
973    GP.hdcd_rom = all_size
974    if all_size < 0:
975        GP.hdcd_rom = "error"
976        return False
977    else:
978        GP.hdcd_rom = f"{all_size} KB"
979    if all_size > baseline:
980        print(f"rom size is {all_size}, overlimit baseline {baseline}")
981        return False
982    else:
983        print(f"rom size is {all_size}, underlimit baseline {baseline}")
984        return True
985
986
987def run_command_with_timeout(command, timeout):
988    try:
989        result = subprocess.run(command.split(), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
990        return result.stdout.decode(), result.stderr.decode()
991    except subprocess.TimeoutExpired:
992        return "", "Command timed out"
993    except subprocess.CalledProcessError as e:
994        return "", e.stderr.decode()
995
996
997def check_cmd_block(command, pattern, timeout=600):
998    # 启动子进程
999    process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
1000
1001    # 用于存储子进程的输出
1002    output = ""
1003
1004    try:
1005        # 读取子进程的输出
1006        output, _ = process.communicate(timeout=timeout)
1007    except subprocess.TimeoutExpired:
1008        process.terminate()
1009        process.kill()
1010        output, _ = process.communicate(timeout=timeout)
1011
1012    print(f"--> output: {output}")
1013    if pattern in output:
1014        return True
1015    else:
1016        return False
1017
1018
1019def check_version(version):
1020    def decorator(func):
1021        @functools.wraps(func)
1022        def wrapper(*args, **kwargs):
1023            if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version):
1024                print("version does not match, ignore this case")
1025                pytest.skip("Version does not match, test skipped.")
1026            return func(*args, **kwargs)
1027        return wrapper
1028    return decorator
1029
1030
1031@pytest.fixture(scope='class', autouse=True)
1032def load_gp(request):
1033    GP.load()
1034
1035
1036class ConfigFileNotFoundException(Exception):
1037    """配置文件未找到异常"""
1038    pass
1039
1040
1041def get_hdcd_pss():
1042    mem_string = get_shell_result("shell hidumper --mem `pidof hdcd`")
1043    print(f"--> hdcd mem: \n{mem_string}")
1044    pss_string = get_shell_result("shell hidumper --mem `pidof hdcd` | grep Total")
1045    if "Total" in pss_string:
1046        pss_value = int(re.sub(r"\s+", " ", pss_string.split('\n')[1]).split(' ')[2])
1047    else:
1048        print("error: can't get pss value, message:%s", pss_string)
1049        pss_value = 0
1050    return pss_value
1051
1052
1053def get_hdcd_fd_count():
1054    sep = '/'
1055    fd_string = get_shell_result(f"shell ls {sep}proc/`pidof hdcd`/fd | wc -l")
1056    print(f"--> hdcd fd cound: {fd_string}")
1057    try:
1058        end_symbol = get_end_symbol()
1059        fd_value = int(fd_string.split(end_symbol)[0])
1060    except ValueError:
1061        fd_value = 0
1062    return fd_value
1063
1064
1065def get_end_symbol():
1066    return sys.platform == 'win32' and "\r\n" or '\n'
1067
1068
1069def get_server_pid_from_file():
1070    is_ohos = "Harmony" in platform.system()
1071    if not is_ohos:
1072        tmp_dir_path = tempfile.gettempdir()
1073    else:
1074        tmp_dir_path = os.path.expanduser("~")
1075    pid_file = os.path.join(tmp_dir_path, ".HDCServer.pid")
1076    with open(pid_file, "r") as f:
1077        pid = f.read()
1078    try:
1079        pid = int(pid)
1080    except ValueError:
1081        pid = 0
1082    print(f"--> pid of hdcserver is {pid}")
1083    return pid
1084
1085
1086def check_unsupport_systems(systems):
1087    def decorator(func):
1088        @functools.wraps(func)
1089        def wrapper(*args, **kwargs):
1090            cur_sys = platform.system()
1091            for system in systems:
1092                if system in cur_sys:
1093                    print("system not support, ignore this case")
1094                    pytest.skip("System not support, test skipped.")
1095            return func(*args, **kwargs)
1096        return wrapper
1097    return decorator
1098
1099
1100@check_unsupport_systems(["Harmony"])
1101def get_cmd_block_output(command, timeout=600):
1102    # 启动子进程
1103    process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
1104
1105    # 用于存储子进程的输出
1106    output = ""
1107
1108    try:
1109        # 读取子进程的输出
1110        output, _ = process.communicate(timeout=timeout)
1111    except subprocess.TimeoutExpired:
1112        process.terminate()
1113        process.kill()
1114        output, _ = process.communicate(timeout=timeout)
1115
1116    print(f"--> output: {output}")
1117    return output
1118
1119
1120def get_cmd_block_output_and_error(command, timeout=600):
1121    print(f"cmd: {command}")
1122    # 启动子进程
1123    process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
1124
1125    # 用于存储子进程的输出
1126    output = ""
1127    error = ""
1128
1129    try:
1130        # 读取子进程的输出
1131        output, error = process.communicate(timeout=timeout)
1132    except subprocess.TimeoutExpired:
1133        process.terminate()
1134        process.kill()
1135        output, error = process.communicate(timeout=timeout)
1136
1137    print(f"--> output: {output}")
1138    print(f"--> error: {error}")
1139    return output, error
1140
1141
1142def send_file(conn, file_name):
1143    """
1144    socket收发数据相关的功能函数,用于socket收发数据测试
1145    文件发送端发送文件到接收端
1146    1. Sender send file size(bytes string) to receiver
1147    2. The sender waits for the receiver to send back the file size
1148    3. The sender cyclically sends file data to the receiver
1149    """
1150    logger.info(f"send_file enter:{file_name}")
1151    file_path = os.path.join(GP.local_path, file_name)
1152    logger.info(f"send_file file full name:{file_path}")
1153    file_size = os.path.getsize(file_path)
1154    logger.info(f"send_file file size:{file_size}")
1155
1156    # send size
1157    conn.send(str(file_size).encode('utf-8'))
1158
1159    # recv file size for check
1160    logger.info(f"send_file: start recv check size")
1161    size_raw = conn.recv(1024)
1162    logger.info(f"send_file: check size_raw {size_raw}")
1163    if len(size_raw) == 0:
1164        logger.error(f"send_file: recv check size len is 0, exit")
1165        return
1166    file_size_recv = int(size_raw.decode('utf-8'))
1167    if file_size_recv != file_size:
1168        logger.error(f"send_file: check size failed, file_size_recv:{file_size_recv} file size:{file_size}")
1169        return
1170
1171    logger.info(f"send_file start send file data")
1172    index = 0
1173    with open(file_path, 'rb') as f:
1174        while True:
1175            one_block = f.read(4096)
1176            if not one_block:
1177                logger.info(f"send_file index:{index} read 0 block")
1178                break
1179            conn.send(one_block)
1180            index = index + 1
1181
1182
1183def process_conn(conn, addr):
1184    """
1185    socket收发数据相关的功能函数,用于socket收发数据测试
1186    Server client interaction description:
1187    1. client send "get [file_name]" to server
1188    2. server send file size(string) to client
1189    3. client send back size to server
1190    4. server send file data to client
1191    """
1192    conn.settimeout(5)  # timeout 5 second
1193    try:
1194        logger.info(f"server accept, addr:{str(addr)}")
1195        message = conn.recv(1024)
1196        message_str = message.decode('utf-8')
1197        logger.info(f"conn recv msg [{len(message_str)}] {message_str}")
1198        if len(message) == 0:
1199            conn.close()
1200            logger.info(f"conn msg len is 0, close {conn} addr:{addr}")
1201            return
1202        cmds = message_str.split()
1203        logger.info(f"conn cmds:{cmds}")
1204        cmd = cmds[0]
1205        if cmd == "get":  # ['get', 'xxxx']
1206            logger.info(f"conn cmd is get, file name:{cmds[1]}")
1207            file_name = cmds[1]
1208            send_file(conn, file_name)
1209        logger.info(f"conn normal close")
1210    except socket.timeout:
1211        logger.info(f"conn:{conn} comm timeout, addr:{addr}")
1212    except ConnectionResetError:
1213        logger.info(f"conn:{conn} ConnectionResetError, addr:{addr}")
1214    conn.close()
1215
1216
1217def server_loop(port_num):
1218    """
1219    socket收发数据相关的功能函数,用于socket收发数据测试
1220    服务端,每次监听,接入连接,收发数据结束后关闭监听
1221    """
1222    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1223    server_socket.bind(('localhost', port_num))
1224    server_socket.listen()
1225    logger.info(f"server start listen {port_num}")
1226    server_socket.settimeout(10)  # timeout 10 second
1227
1228    try:
1229        conn, addr = server_socket.accept()
1230        process_conn(conn, addr)
1231    except socket.timeout:
1232        logger.error(f"server accept timeout, port:{port_num}")
1233
1234    server_socket.close()
1235    logger.info(f"server exit, port:{port_num}")
1236
1237
1238def recv_file_data(client_socket, file_path, file_size):
1239    """
1240    socket收发数据相关的功能函数,用于socket收发数据测试
1241    """
1242    logger.info(f"client: start recv file data, file size:{file_size}, file path:{file_path}")
1243    with open(file_path, 'wb') as f:
1244        recv_size = 0
1245        while recv_size < file_size:
1246            one_block = client_socket.recv(4096)
1247            if not one_block:
1248                logger.info(f"client: read block size 0, exit")
1249                break
1250            f.write(one_block)
1251            recv_size += len(one_block)
1252    logger.info(f"client: recv file data finished, recv size:{recv_size}")
1253
1254
1255def client_get_file(port_num, file_name, file_save_name):
1256    """
1257    socket收发数据相关的功能函数,用于socket收发数据测试
1258    """
1259    logger.info(f"client: connect port:{port_num}")
1260    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1261    client_socket.settimeout(10)  # timeout 10 second
1262    try:
1263        client_socket.connect(('localhost', port_num))
1264    except socket.timeout:
1265        logger.info(f"client connect timeout, port:{port_num}")
1266        return
1267
1268    try:
1269        cmd = f"get {file_name}"
1270        logger.info(f"client: send cmd:{cmd}")
1271        client_socket.send(cmd.encode('utf-8'))
1272
1273        # recv file size
1274        size_raw = client_socket.recv(1024)
1275        logger.info(f"client: recv size_raw {size_raw}")
1276        if len(size_raw) == 0:
1277            logger.info(f"client: cmd:{cmd} recv size_raw len is 0, exit")
1278            return
1279        file_size = int(size_raw.decode('utf-8'))
1280        logger.info(f"client: file size {file_size}")
1281
1282        file_path = os.path.join(GP.local_path, file_save_name)
1283        if os.path.exists(file_path):
1284            logger.info(f"client: file {file_path} exist, delete")
1285            try:
1286                os.remove(file_path)
1287            except OSError as error:
1288                logger.info(f"delete {file_path} failed: {error.strerror}")
1289
1290        # send size msg to client for check
1291        logger.info(f"client: Send back file size:{size_raw}")
1292        client_socket.send(size_raw)
1293        recv_file_data(client_socket, file_path, file_size)
1294    except socket.timeout:
1295        logger.error(f"client communication timeout, port:{port_num}")
1296        return
1297    finally:
1298        logger.info("client socket close")
1299        client_socket.close()
1300    logger.info("client exit")