• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2021 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# 0. 运行环境: python 3.10+, pytest
17# 1. 修改 GP 中字段
18# 2. pytest [-k case_name_pattern]
19#    eg. pytest -k file 执行方法名含 file 的用例
20
21# 打包
22# pip install pyinstaller
23# prepare assert source dir includes your data files
24# pyi-makespec  -D --add-data assert:assert dev_hdc_test.py
25# pyinstaller dev_hdc_test.spec
26# 执行 dev_hdc_test.exe
27
28import csv
29import hashlib
30import json
31import logging
32import os
33import random
34import re
35import stat
36import shutil
37import subprocess
38import sys
39import threading
40import time
41from multiprocessing import Process
42
43import pytest
44import pkg_resources
45
46
47class GP(object):
48    """ Global Parameters
49
50    customize here !!!
51    """
52    hdc_exe = "hdc"
53    local_path = "resource"
54    remote_path = "data/local/tmp"
55    remote_dir_path = "data/local/tmp/it_send_dir"
56    remote_ip = "auto"
57    remote_port = 8710
58    hdc_head = "hdc"
59    device_name = ""
60    targets = []
61    tmode = "usb"
62    changed_testcase = "n"
63    testcase_path = "ts_windows.csv"
64    loaded_testcase = 0
65    debug_app = "com.example.myapplication"
66
67    @classmethod
68    def init(cls):
69        logging.basicConfig(level=logging.INFO,
70                            format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s',
71                            datefmt='%d %b %Y %H:%M:%S',
72            )
73        logging.basicConfig(level=logging.WARNING,
74                            format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s',
75                            datefmt='%d %b %Y %H:%M:%S',
76                            )
77
78        if os.path.exists(".hdctester.conf"):
79            cls.load()
80            cls.start_host()
81            cls.list_targets()
82        else:
83            cls.set_options()
84            cls.print_options()
85            cls.start_host()
86            cls.dump()
87        return
88
89
90    @classmethod
91    def start_host(cls):
92        cmd = f"{cls.hdc_exe} start"
93        res = subprocess.call(cmd.split())
94        return res
95
96
97    @classmethod
98    def list_targets(cls):
99        try:
100            targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split()
101        except (OSError, IndexError):
102            targets = [b"failed to auto detect device"]
103            cls.targets = [targets[0].decode()]
104            return False
105        cls.targets = [t.decode() for t in targets]
106        return True
107
108
109    @classmethod
110    def get_device(cls):
111        cls.start_host()
112        cls.list_targets()
113        if len(cls.targets) > 1:
114            print("Multiple device detected, please select one:")
115            for i, t in enumerate(cls.targets):
116                print(f"{i+1}. {t}")
117            print("input the nums of the device above:")
118            cls.device_name = cls.targets[int(input()) - 1]
119        else:
120            cls.device_name = cls.targets[0]
121        if cls.device_name == "failed to auto detect device":
122            print("No device detected, please check your device connection")
123            return False
124        elif cls.device_name == "[empty]":
125            print("No hdc device detected.")
126            return False
127        cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}"
128        return True
129
130
131    @classmethod
132    def dump(cls):
133        try:
134            os.remove(".hdctester.conf")
135        except OSError:
136            pass
137        content = filter(
138            lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items())
139        json_str = json.dumps(dict(content))
140        fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755)
141        os.write(fd, json_str.encode())
142        os.close(fd)
143        return True
144
145
146    @classmethod
147    def load(cls):
148        with open(".hdctester.conf") as f:
149            content = json.load(f)
150            cls.hdc_exe = content.get("hdc_exe")
151            cls.local_path = content.get("local_path")
152            cls.remote_path = content.get("remote_path")
153            cls.remote_ip = content.get("remote_ip")
154            cls.hdc_head = content.get("hdc_head")
155            cls.tmode = content.get("tmode")
156            cls.device_name = content.get("device_name")
157            cls.changed_testcase = content.get("changed_testcase")
158            cls.testcase_path = content.get("testcase_path")
159            cls.loaded_testcase = content.get("load_testcase")
160        return True
161
162
163    @classmethod
164    def print_options(cls):
165        info = "HDC Tester Default Options: \n\n" \
166        + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \
167        + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \
168        + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \
169        + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \
170        + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \
171        + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \
172        + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \
173        + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \
174        + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \
175        + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \
176        + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n"
177        print(info)
178
179
180    @classmethod
181    def tconn_tcp(cls):
182        res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode()
183        if "Connect OK" in res:
184            return True
185        else:
186            return False
187
188
189    @classmethod
190    def set_options(cls):
191        if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip():
192            cls.hdc_exe = opt
193        if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip():
194            cls.local_path = opt
195        if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip():
196            cls.remote_path = opt
197        if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip():
198            cls.remote_ip = opt
199        if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip():
200            cls.remote_port = int(opt)
201        if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip():
202            cls.device_name = opt
203        if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip():
204            cls.tmode = opt
205        if cls.tmode == "usb":
206            ret = cls.get_device()
207            if ret:
208                print("USB device detected.")
209        elif cls.tconn_tcp():
210            cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}"
211        else:
212            print(f"tconn {cls.remote_ip}:{cls.remote_port} failed")
213            return False
214        return True
215
216
217    @classmethod
218    def change_testcase(cls):
219        if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip():
220            cls.changed_testcase = opt
221            if opt == "n":
222                return False
223        if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip():
224            cls.testcase_path = os.path.join(opt)
225        cls.print_options()
226        return True
227
228
229    @classmethod
230    def load_testcase(cls):
231        print("this fuction will coming soon.")
232        return False
233
234    @classmethod
235    def get_version(cls):
236        version = f"v1.0.4a"
237        return version
238
239
240def pytest_run(args):
241    file_list = []
242    file_list.append("entry-default-signed-debug.hap")
243    file_list.append("libA_v10001.hsp")
244    file_list.append("libB_v10001.hsp")
245    for file in file_list:
246        if not os.path.exists(os.path.join(GP.local_path, file)):
247            print(f"No {file} File!")
248            print("请将package.zip中的安装包文件解压到当前脚本resource目录中,操作完成该步骤后重新执行脚本。")
249            print("Please unzip package.zip to resource directory, please rerun after operation.")
250            input("[ENTER]")
251            return
252    gen_package_dir()
253    start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
254    if args.count is not None:
255        for i in range(args.count):
256            print(f"------------The {i}/{args.count} Test-------------")
257            timestamp = time.time()
258            pytest_args = [
259                '--verbose', args.verbose,
260                '--report=report.html',
261                '--title=HDC Test Report 'f"{GP.get_version()}",
262                '--tester=tester001',
263                '--template=1',
264                '--desc='f"{args.verbose}:{args.desc}"
265            ]
266            pytest.main(pytest_args)
267    end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
268    report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time()))
269    report_dir = os.path.join(os.getcwd(), "reports")
270    report_file = os.path.join(report_dir, f"{report_time}report.html")
271    print(f"Test over, the script version is {GP.get_version()},"
272        f" start at {start_time}, end at {end_time} \n"
273        f"=======>The device hdcd ROM size is {GP.hdcd_rom}.\n"
274        f"=======>{report_file} is saved. \n"
275    )
276    input("=======>press [Enter] key to Show logs.")
277
278
279def rmdir(path):
280    try:
281        if sys.platform == "win32":
282            if os.path.isfile(path):
283                os.remove(path)
284            else:
285                shutil.rmtree(path)
286        else:
287            subprocess.call(f"rm -rf {path}".split())
288    except OSError:
289        print(f"Error: {path} : cannot remove")
290        pass
291
292
293def get_local_path(path):
294    return os.path.join(GP.local_path, path)
295
296
297def get_remote_path(path):
298    return f"{GP.remote_path}/{path}"
299
300
301def get_local_md5(local):
302    md5_hash = hashlib.md5()
303    with open(local, "rb") as f:
304        for byte_block in iter(lambda: f.read(4096), b""):
305            md5_hash.update(byte_block)
306    return md5_hash.hexdigest()
307
308
309def check_shell_any_device(cmd, pattern=None, fetch=False):
310    print(f"\nexecuting command: {cmd}")
311    if pattern: # check output valid
312        print("pattern case")
313        output = subprocess.check_output(cmd.split()).decode()
314        res = pattern in output
315        print(f"--> output: {output}")
316        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
317        return res, output
318    elif fetch:
319        output = subprocess.check_output(cmd.split()).decode()
320        print(f"--> output: {output}")
321        return True, output
322    else: # check cmd run successfully
323        print("other case")
324        return subprocess.check_call(cmd.split()) == 0, ""
325
326
327def check_shell(cmd, pattern=None, fetch=False, head=None):
328    if head is None:
329        head = GP.hdc_head
330    cmd = f"{head} {cmd}"
331    print(f"\nexecuting command: {cmd}")
332    if pattern: # check output valid
333        output = subprocess.check_output(cmd.split()).decode()
334        res = pattern in output
335        print(f"--> output: {output}")
336        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
337        return res
338    elif fetch:
339        output = subprocess.check_output(cmd.split()).decode()
340        print(f"--> output: {output}")
341        return output
342    else: # check cmd run successfully
343        return subprocess.check_call(cmd.split()) == 0
344
345
346def get_shell_result(cmd, pattern=None, fetch=False):
347    cmd = f"{GP.hdc_head} {cmd}"
348    print(f"\nexecuting command: {cmd}")
349    return subprocess.check_output(cmd.split()).decode()
350
351
352def check_rate(cmd, expected_rate):
353    send_result = get_shell_result(cmd)
354    rate = float(send_result.split("rate:")[1].split("kB/s")[0])
355    return rate > expected_rate
356
357
358def check_dir(local, remote, is_single_dir=False):
359    def _get_md5sum(remote, is_single_dir=False):
360        if is_single_dir:
361            cmd = f"{GP.hdc_head} shell md5sum {remote}/*"
362        else:
363            cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;'
364        result = subprocess.check_output(cmd.split()).decode()
365        return result
366
367    def _calculate_md5(file_path):
368        md5 = hashlib.md5()
369        try:
370            with open(file_path, 'rb') as f:
371                for chunk in iter(lambda: f.read(4096), b""):
372                    md5.update(chunk)
373            return md5.hexdigest()
374        except PermissionError:
375            return "PermissionError"
376        except FileNotFoundError:
377            return "FileNotFoundError"
378    print("remote:" + remote)
379    output = _get_md5sum(remote)
380    print(output)
381
382    result = 1
383    for line in output.splitlines():
384        if len(line) < 32: # length of MD5
385            continue
386        expected_md5, file_name = line.split()[:2]
387        if is_single_dir:
388            file_name = file_name.replace(f"{remote}", "")
389        elif GP.remote_path in remote:
390            file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\")
391        else:
392            file_name = file_name.split(remote)[1].replace("/", "\\")
393        file_path = os.path.join(os.getcwd(), GP.local_path) + file_name  # 构建完整的文件路径
394        if is_single_dir:
395            file_path = os.path.join(os.getcwd(), local) + file_name
396        print(file_path)
397        actual_md5 = _calculate_md5(file_path)
398        print(f"Expected: {expected_md5}")
399        print(f"Actual: {actual_md5}")
400        print(f"MD5 matched {file_name}")
401        if actual_md5 != expected_md5:
402            print(f"[Fail]MD5 mismatch for {file_name}")
403            result *= 0
404
405    return (result == 1)
406
407
408def _check_file(local, remote, head=None):
409    if head is None:
410        head = GP.hdc_head
411    if remote.startswith("/proc"):
412        local_size = os.path.getsize(local)
413        if local_size > 0:
414            return True
415        else:
416            return False
417    else:
418        cmd = f"shell md5sum {remote}"
419        local_md5 = get_local_md5(local)
420        return check_shell(cmd, local_md5, head=head)
421
422
423def _check_app_installed(bundle, is_shared=False):
424    dump = "dump-shared" if is_shared else "dump"
425    cmd = f"shell bm {dump} -a"
426    return check_shell(cmd, bundle)
427
428
429def check_hdc_targets():
430    cmd = f"{GP.hdc_head} list targets"
431    print(GP.device_name)
432    return check_shell(cmd, GP.device_name)
433
434
435def check_file_send(local, remote):
436    local_path = os.path.join(GP.local_path, local)
437    remote_path = f"{GP.remote_path}/{remote}"
438    cmd = f"file send {local_path} {remote_path}"
439    return check_shell(cmd) and _check_file(local_path, remote_path)
440
441
442def check_file_recv(remote, local):
443    local_path = os.path.join(GP.local_path, local)
444    remote_path = f"{GP.remote_path}/{remote}"
445    cmd = f"file recv {remote_path} {local_path}"
446    return check_shell(cmd) and _check_file(local_path, remote_path)
447
448
449def check_app_install(app, bundle, args=""):
450    app = os.path.join(GP.local_path, app)
451    install_cmd = f"install {args} {app}"
452    if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")):
453        return check_shell(install_cmd, "failed to install bundle")
454    else:
455        return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args)
456
457
458def check_app_uninstall(bundle, args=""):
459    uninstall_cmd = f"uninstall {args} {bundle}"
460    return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args)
461
462
463def check_app_install_multi(tables, args=""):
464    apps = []
465    bundles = []
466    for app, bundle in tables.items() :
467        app = os.path.join(GP.local_path, app)
468        apps.append(app)
469        bundles.append(bundle)
470
471    apps_str = " ".join(apps)
472    install_cmd = f"install {args} {apps_str}"
473
474    if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str))
475        or (args == "" and 0 == apps_str.count(".hap"))):
476        if not check_shell(install_cmd, "failed to install bundle"):
477            return False
478    else:
479        if not check_shell(install_cmd, "successfully"):
480            return False
481
482        for bundle in bundles:
483            if not _check_app_installed(bundle, "s" in args):
484                return False
485
486    return True
487
488
489def check_app_uninstall_multi(tables, args=""):
490    for app, bundle in tables.items() :
491        if not check_app_uninstall(bundle, args):
492            return False
493
494    return True
495
496
497def check_hdc_cmd(cmd, pattern=None, head=None, is_single_dir=True, **args):
498    if head is None:
499        head = GP.hdc_head
500    if cmd.startswith("file"):
501        if not check_shell(cmd, "FileTransfer finish", head=head):
502            return False
503        if cmd.startswith("file send"):
504            local, remote = cmd.split()[-2:]
505        else:
506            remote, local = cmd.split()[-2:]
507        if "-b" in cmd:
508            mnt_debug_path = "mnt/debug/100/debug_hap/"
509            remote = f"{mnt_debug_path}/{GP.debug_app}/{remote}"
510        if os.path.isfile(local):
511            return _check_file(local, remote, head=head)
512        else:
513            return check_dir(local, remote, is_single_dir=is_single_dir)
514    elif cmd.startswith("install"):
515        bundle = args.get("bundle", "invalid")
516        opt = " ".join(cmd.split()[1:-1])
517        return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt)
518
519    elif cmd.startswith("uninstall"):
520        bundle = cmd.split()[-1]
521        opt = " ".join(cmd.split()[1:-1])
522        return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt)
523
524    else:
525        return check_shell(cmd, pattern, head=head, **args)
526
527
528def check_soft_local(local_source, local_soft, remote):
529    cmd = f"file send {local_soft} {remote}"
530    if not check_shell(cmd, "FileTransfer finish"):
531        return False
532    return _check_file(local_source, remote)
533
534
535def check_soft_remote(remote_source, remote_soft, local_recv):
536    check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}")
537    cmd = f"file recv {remote_soft} {local_recv}"
538    if not check_shell(cmd, "FileTransfer finish"):
539        return False
540    return _check_file(local_recv, get_remote_path(remote_source))
541
542
543def switch_usb():
544    res = check_hdc_cmd("tmode usb")
545    time.sleep(3)
546    if res:
547        GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}"
548    return res
549
550
551def copy_file(src, dst):
552    try:
553        shutil.copy2(src, dst)
554        print(f"File copied successfully from {src} to {dst}")
555    except IOError as e:
556        print(f"Unable to copy file. {e}")
557    except Exception as e:
558        print(f"Unexpected error: {e}")
559
560
561def switch_tcp():
562    if not GP.remote_ip: # skip tcp check
563        print("!!! remote_ip is none, skip tcp check !!!")
564        return True
565    if GP.remote_ip == "auto":
566        ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True)
567        if not ipconf:
568            print("!!! device ip not found, skip tcp check !!!")
569            return True
570        GP.remote_ip = ipconf.split(":")[1].split()[0]
571        print(f"fetch remote ip: {GP.remote_ip}")
572    ret = check_hdc_cmd(f"tmode port {GP.remote_port}")
573    if ret:
574        time.sleep(3)
575    res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK")
576    if res:
577        GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}"
578    return res
579
580
581def select_cmd():
582    msg = "1) Proceed tester\n" \
583        + "2) Customize tester\n" \
584        + "3) Setup files for transfer\n" \
585        + "4) Load custom testcase(default unused) \n" \
586        + "5) Exit\n" \
587        + ">> "
588
589    while True:
590        opt = input(msg).strip()
591        if len(opt) == 1 and '1' <= opt <= '5':
592            return opt
593
594
595def gen_file(path, size):
596    index = 0
597    path = os.path.abspath(path)
598    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
599
600    while index < size:
601        os.write(fd, os.urandom(1024))
602        index += 1024
603    os.close(fd)
604
605
606def gen_zero_file(path, size):
607    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
608    os.write(fd, b'0' * size)
609    os.close(fd)
610
611
612def create_file_with_size(path, size):
613    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
614    os.write(fd, b'\0' * size)
615    os.close(fd)
616
617
618def gen_file_set():
619    print("generating empty file ...")
620    gen_file(os.path.join(GP.local_path, "empty"), 0)
621
622    print("generating small file ...")
623    gen_file(os.path.join(GP.local_path, "small"), 102400)
624
625    print("generating medium file ...")
626    gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2)
627
628    print("generating large file ...")
629    gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3)
630
631    print("generating soft link ...")
632    try:
633        os.symlink("small", os.path.join(GP.local_path, "soft_small"))
634    except FileExistsError:
635        print("soft_small already exists")
636
637    print("generating text file ...")
638    gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2)
639
640    print("generating package dir ...")
641    if not os.path.exists(os.path.join(GP.local_path, "package")):
642        os.mkdir(os.path.join(GP.local_path, "package"))
643    for i in range(1, 6):
644        gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2)
645
646    print("generating deep dir ...")
647    deepth = 4
648    deep_path = os.path.join(GP.local_path, "deep_dir")
649    if not os.path.exists(deep_path):
650        os.mkdir(deep_path)
651    for deep in range(deepth):
652        deep_path = os.path.join(deep_path, f"deep_dir{deep}")
653        if not os.path.exists(deep_path):
654            os.mkdir(deep_path)
655    gen_file(os.path.join(deep_path, "deep"), 102400)
656
657    print("generating dir with file ...")
658    dir_path = os.path.join(GP.local_path, "problem_dir")
659    rmdir(dir_path)
660    os.mkdir(dir_path)
661    gen_file(os.path.join(dir_path, "small2"), 102400)
662
663    fuzz_count = 47 # 47 is the count that circulated the file transfer
664    data_unit = 1024 # 1024 is the size that circulated the file transfer
665    data_extra = 936 # 936 is the size that cased the extra file transfer
666    for i in range(fuzz_count):
667        create_file_with_size(
668            os.path.join(
669                dir_path, f"file_{i * data_unit+data_extra}.dat"
670                ), i * data_unit + data_extra
671            )
672
673    print("generating empty dir ...")
674    dir_path = os.path.join(GP.local_path, "empty_dir")
675    rmdir(dir_path)
676    os.mkdir(dir_path)
677
678    print("generating version file ...")
679    gen_file(os.path.join(GP.local_path, GP.get_version()), 0)
680
681
682def gen_package_dir():
683    print("generating app dir ...")
684    dir_path = os.path.join(GP.local_path, "app_dir")
685    rmdir(dir_path)
686    os.mkdir(dir_path)
687    app = os.path.join(GP.local_path, "entry-default-signed-debug.hap")
688    dst_dir = os.path.join(GP.local_path, "app_dir")
689    if not os.path.exists(app):
690        print(f"Source file {app} does not exist.")
691    else:
692        copy_file(app, dst_dir)
693
694
695def prepare_source():
696    if os.path.exists(os.path.join(GP.local_path, GP.get_version())):
697        print(f"hdc test version is {GP.get_version}, check ok, skip prepare.")
698        return
699    print(f"in prepare {GP.local_path},wait for 2 mins.")
700    current_path = os.getcwd()
701
702    if os.path.exists(GP.local_path):
703        # 打开local_path遍历其中的文件,删除hap hsp以外的所有文件
704        for file in os.listdir(GP.local_path):
705            if file.endswith(".hap") or file.endswith(".hsp"):
706                continue
707            file_path = os.path.join(GP.local_path, file)
708            rmdir(file_path)
709    else:
710        os.mkdir(GP.local_path)
711
712    gen_file_set()
713
714
715def add_prepare_source():
716    deep_path = os.path.join(GP.local_path, "deep_test_dir")
717    print("generating deep test dir ...")
718    absolute_path = os.path.abspath(__file__)
719    deepth = (255 - 9 - len(absolute_path)) % 14
720    os.mkdir(deep_path)
721    for deep in range(deepth):
722        deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
723        os.mkdir(deep_path)
724    gen_file(os.path.join(deep_path, "deep_test"), 102400)
725
726    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
727    print("generating recv test dir ...")
728    os.mkdir(recv_dir)
729
730
731def update_source():
732    deep_path = os.path.join(GP.local_path, "deep_test_dir")
733    if not os.path.exists(deep_path):
734        print("generating deep test dir ...")
735        absolute_path = os.path.abspath(__file__)
736        deepth = (255 - 9 - len(absolute_path)) % 14
737        os.mkdir(deep_path)
738        for deep in range(deepth):
739            deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
740            os.mkdir(deep_path)
741        gen_file(os.path.join(deep_path, "deep_test"), 102400)
742
743    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
744    if not os.path.exists(recv_dir):
745        print("generating recv test dir ...")
746        os.mkdir(recv_dir)
747
748
749def setup_tester():
750    while True:
751        GP.print_options()
752        opt = int(select_cmd())
753        if opt == 1:
754            return True
755        elif opt == 2:
756            if not GP.set_options():
757                return False
758            GP.dump()
759        elif opt == 3:
760            prepare_source()
761        elif opt == 4:
762            if not GP.load_testcase():
763                return False
764        elif opt == 5:
765            return False
766        else:
767            return False
768
769
770def load_testcase():
771    if not GP.load_testcase:
772        print("load testcase failed")
773        return False
774    print("load testcase success")
775    return True
776
777
778def check_library_installation(library_name):
779    try:
780        pkg_resources.get_distribution(library_name)
781        return 0
782    except pkg_resources.DistributionNotFound:
783        print(f"\n\n{library_name} is not installed.\n\n")
784        print(f"try to use command below:")
785        print(f"pip install {library_name}")
786        return 1
787
788
789def check_subprocess_cmd(cmd, process_num, timeout):
790
791    for i in range(process_num):
792        p = subprocess.Popen(cmd.split())
793    try:
794        p.wait(timeout=5)
795    except subprocess.TimeoutExpired:
796        p.kill()
797
798
799def check_process_mixed(process_num, timeout, local, remote):
800    multi_num = process_num
801    list_send = []
802    list_recv = []
803    sizes = {"small", "medium", "empty"}
804    for i in range(multi_num):
805        for size in sizes:
806            cmd_send = f"file send {get_local_path(f'{size}')} {get_remote_path(f'it_{size}_mix_{i}')}"
807            cmd_recv = f"file recv {get_remote_path(f'it_{size}_mix_{i}')} {get_local_path(f'recv_{size}_mix_{i}')}"
808            list_send.append(Process(target=check_hdc_cmd, args=(cmd_send, )))
809            list_recv.append(Process(target=check_hdc_cmd, args=(cmd_recv, )))
810            logging.info(f"RESULT:{cmd_send}")  # 打印命令的输出
811    for send in list_send:
812        wait_time = random.uniform(0, 1)
813        send.start()
814        time.sleep(wait_time)
815    for send in list_send:
816        send.join()
817
818    for recv in list_recv:
819        wait_time = random.uniform(0, 1)
820        recv.start()
821        time.sleep(wait_time)
822    for recv in list_recv:
823        recv.join()
824        wait_time = random.uniform(0, 1)
825        time.sleep(wait_time)
826
827
828def execute_lines_in_file(file_path):
829    if not os.path.exists(file_path):
830        flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
831        modes = stat.S_IWUSR | stat.S_IRUSR
832        with os.fdopen(os.open(file_path, flags, modes), 'w') as file:
833            file.write("1,hdc shell ls")
834    with open(file_path, 'r') as file:
835        lines = file.readlines()
836        for line in lines:
837            test_time = line.split(',')[0]
838            test_cmd = line.split(',')[1]
839            pattern = r"^hdc"
840            match = re.search(pattern, test_cmd)
841            if match:
842                result = test_cmd.replace(match.group(0), "").lstrip()
843                test_cmd = f"{GP.hdc_head} {result}"
844
845            for i in range(int(test_time)):
846                logging.info(f"THE {i+1}/{test_time} TEST,COMMAND IS:{test_cmd}")
847                output = subprocess.check_output(test_cmd.split()).decode()
848                logging.info(f"RESULT:{output}")  # 打印命令的输出
849
850
851def make_multiprocess_file(local, remote, mode, num, task_type):
852    if num < 1:
853        return False
854    if task_type == "file":
855        if mode == "send" :
856            file_list = [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)]
857        elif mode == "recv":
858            file_list = [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)]
859        else:
860            return False
861    if task_type == "dir":
862        if mode == "send" :
863            file_list = [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)]
864        elif mode == "recv":
865            file_list = [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)]
866        else:
867            return False
868    print(file_list[0])
869    p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in file_list]
870    print(f"{mode} target {num} start")
871    while(len(p_list)):
872        for p in p_list:
873            if p.poll() is not None:
874                stdout, stderr = p.communicate(timeout=512) # timeout wait 512s
875                if stderr:
876                    print(f"{stderr.decode()}")
877                if stdout:
878                    print(f"{stdout.decode()}")
879                if stdout.decode().find("FileTransfer finish") == -1:
880                    return False
881                p_list.remove(p)
882    res = 1
883    if task_type == "file":
884        for i in range(num):
885            if mode == "send":
886                if _check_file(local, f"{remote}_{i}"):
887                    res *= 1
888                else:
889                    res *= 0
890            elif mode == "recv":
891                if _check_file(f"{local}_{i}", f"{remote}_{i}"):
892                    res *= 1
893                else:
894                    res *= 0
895    if task_type == "dir":
896        for _ in range(num):
897            if mode == "send":
898                end_of_file_name = os.path.basename(local)
899                if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True):
900                    res *= 1
901                else:
902                    res *= 0
903            elif mode == "recv":
904                end_of_file_name = os.path.basename(remote)
905                local = os.path.join(local, end_of_file_name)
906                if check_dir(f"{local}", f"{remote}", is_single_dir=True):
907                    res *= 1
908                else:
909                    res *= 0
910    return res == 1
911
912
913def hdc_get_key(cmd):
914    test_cmd = f"{GP.hdc_head} {cmd}"
915    result = subprocess.check_output(test_cmd.split()).decode()
916    return result
917
918
919def start_subprocess_cmd(cmd, num, assert_out):
920    if num < 1:
921        return False
922    cmd_list = [f"{GP.hdc_head} {cmd}" for _ in range(num)]
923    p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmd_list]
924    logging.info(f"{cmd} target {num} start")
925    while(len(p_list)):
926        for p in p_list:
927            if p.poll() is not None:
928                stdout, stderr = p.communicate(timeout=512)
929                if stderr:
930                    logging.error(f"{stderr.decode()}")
931                if stdout:
932                    logging.info(f"{stdout.decode()}")
933                if assert_out is not None and stdout.decode().find(assert_out) == -1:
934                    return False
935                p_list.remove(p)
936    return True
937
938
939def check_hdc_version(cmd, version):
940
941    def _convert_version_to_hex(_version):
942        parts = _version.split("Ver: ")[1].split('.')
943        hex_version = ''.join(parts)
944        return int(hex_version, 16)
945
946    expected_version = _convert_version_to_hex(version)
947    cmd = f"{GP.hdc_head} -v"
948    print(f"\nexecuting command: {cmd}")
949    if version is not None: # check output valid
950        output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "")
951        real_version = _convert_version_to_hex(output)
952        print(f"--> output: {output}")
953        print(f"--> your local [{version}] is"
954            f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]"
955        )
956        return expected_version <= real_version
957
958
959def check_cmd_time(cmd, pattern, duration, times):
960    if times < 1:
961        print("times should be bigger than 0.")
962        return False
963    if pattern is None:
964        fetchable = True
965    else:
966        fetchable = False
967    start_time = time.time() * 1000
968    print(f"{cmd} start {start_time}")
969    res = []
970    for i in range(times):
971        start_in = time.time() * 1000
972        if pattern is None:
973            subprocess.check_output(f"{GP.hdc_head} {cmd}".split())
974        else:
975            assert check_shell(cmd, pattern, fetch=fetchable)
976        start_out = time.time() * 1000
977        res.append(start_out - start_in)
978
979    # 计算最大值、最小值和中位数
980    max_value = max(res)
981    min_value = min(res)
982    median_value = sorted(res)[len(res) // 2]
983
984    print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}")
985    print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}")
986    print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}")
987
988    end_time = time.time() * 1000
989
990    try:
991        timecost = int(end_time - start_time) / times
992        print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}")
993    except ZeroDivisionError:
994        print(f"除数为0")
995
996    if duration is None:
997        duration = 150 * 1.2
998    # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status
999    return timecost < duration
1000
1001
1002def check_rom(baseline):
1003
1004    def _try_get_size(message):
1005        try:
1006            size = int(message.split('\t')[0])
1007        except ValueError:
1008            size = -9999 * 1024 # error size
1009            print(f"try get size value error, from {message}")
1010        return size
1011
1012    if baseline is None:
1013        baseline = 2200
1014    # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together
1015    cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd"
1016    result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode()
1017    hdcd_size = _try_get_size(result_hdcd)
1018    cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so"
1019    result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode()
1020    if "directory" in result_libhdc:
1021        cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so"
1022        result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode()
1023        if "directory" in result_libhdc64:
1024            libhdc_size = 0
1025        else:
1026            libhdc_size = _try_get_size(result_libhdc64)
1027    else:
1028        libhdc_size = _try_get_size(result_libhdc)
1029    all_size = hdcd_size + libhdc_size
1030    GP.hdcd_rom = all_size
1031    if all_size < 0:
1032        GP.hdcd_rom = "error"
1033        return False
1034    else:
1035        GP.hdcd_rom = f"{all_size} KB"
1036    if all_size > baseline:
1037        print(f"rom size is {all_size}, overlimit baseline {baseline}")
1038        return False
1039    else:
1040        print(f"rom size is {all_size}, underlimit baseline {baseline}")
1041        return True
1042
1043
1044def run_command_with_timeout(command, timeout):
1045    try:
1046        result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
1047        return result.stdout.decode(), result.stderr.decode()
1048    except subprocess.TimeoutExpired:
1049        return None, "Command timed out"
1050    except subprocess.CalledProcessError as e:
1051        return None, e.stderr.decode()
1052
1053
1054def check_cmd_block(command, pattern, timeout=600):
1055    # 启动子进程
1056    process = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
1057
1058    # 用于存储子进程的输出
1059    output = ""
1060
1061    try:
1062        # 读取子进程的输出
1063        output, _ = process.communicate(timeout=timeout)
1064    except subprocess.TimeoutExpired:
1065        process.terminate()
1066        process.kill()
1067        output, _ = process.communicate(timeout=timeout)
1068
1069    print(f"--> output: {output}")
1070    if pattern in output:
1071        return True
1072    else:
1073        return False
1074