• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2021 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# 0. 运行环境: python 3.10+, pytest
17# 1. 修改 GP 中字段
18# 2. pytest [-k case_name_pattern]
19#    eg. pytest -k file 执行方法名含 file 的用例
20
21# 打包
22# pip install pyinstaller
23# prepare assert source dir includes your data files
24# pyi-makespec  -D --add-data assert:assert dev_hdc_test.py
25# pyinstaller dev_hdc_test.spec
26# 执行 dev_hdc_test.exe
27
28import hashlib
29import json
30import os
31import shutil
32import subprocess
33import sys
34import time
35import csv
36import pytest
37import pkg_resources
38
39
40class GP(object):
41    """ Global Parameters
42
43    customize here !!!
44    """
45    hdc_exe = "hdc"
46    local_path = "resource"
47    remote_path = "data/local/tmp"
48    remote_ip = "auto"
49    remote_port = 8710
50    hdc_head = "hdc"
51    device_name = ""
52    targets = []
53    tmode = "usb"
54    changed_testcase = "n"
55    testcase_path = "ts_windows.csv"
56    loaded_testcase = 0
57
58    @classmethod
59    def init(cls):
60        cls.list_targets()
61        if os.path.exists(".hdctester.conf"):
62            cls.load()
63            return
64        else:
65            cls.set_options()
66            cls.print_options()
67            cls.dump()
68        return
69
70
71    @classmethod
72    def list_targets(cls):
73        try:
74            targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split()
75        except (OSError, IndexError):
76            targets = [b"failed to auto detect device"]
77            cls.targets = [targets[0].decode()]
78            return False
79        cls.targets = [t.decode() for t in targets]
80        return True
81
82
83    @classmethod
84    def get_device(cls):
85        if len(cls.targets) > 1:
86            print("Multiple device detected, please select one:")
87            for i, t in enumerate(cls.targets):
88                print(f"{i+1}. {t}")
89            print("input the nums of the device above:")
90            cls.device_name = cls.targets[int(input()) - 1]
91        else:
92            cls.device_name = cls.targets[0]
93        if cls.device_name == "failed to auto detect device":
94            print("No device detected, please check your device connection")
95            return False
96        elif cls.device_name == "[empty]":
97            print("No hdc device detected.")
98            return False
99        cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}"
100        return True
101
102
103    @classmethod
104    def dump(cls):
105        try:
106            os.remove(".hdctester.conf")
107        except OSError:
108            pass
109        content = filter(
110            lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items())
111        json_str = json.dumps(dict(content))
112        fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755)
113        os.write(fd, json_str.encode())
114        os.close(fd)
115        return True
116
117
118    @classmethod
119    def load(cls):
120        with open(".hdctester.conf") as f:
121            content = json.load(f)
122            cls.hdc_exe = content.get("hdc_exe")
123            cls.local_path = content.get("local_path")
124            cls.remote_path = content.get("remote_path")
125            cls.remote_ip = content.get("remote_ip")
126            cls.hdc_head = content.get("hdc_head")
127            cls.tmode = content.get("tmode")
128            cls.device_name = content.get("device_name")
129            cls.changed_testcase = content.get("changed_testcase")
130            cls.testcase_path = content.get("testcase_path")
131            cls.loaded_testcase = content.get("load_testcase")
132        return True
133
134
135    @classmethod
136    def print_options(cls):
137        info = "HDC Tester Default Options: \n\n" \
138        + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \
139        + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \
140        + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \
141        + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \
142        + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \
143        + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \
144        + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \
145        + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \
146        + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \
147        + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \
148        + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n"
149        print(info)
150
151
152    @classmethod
153    def tconn_tcp(cls):
154        res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode()
155        if "Connect OK" in res:
156            return True
157        else:
158            return False
159
160
161    @classmethod
162    def set_options(cls):
163        if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip():
164            cls.hdc_exe = opt
165        if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip():
166            cls.local_path = opt
167        if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip():
168            cls.remote_path = opt
169        if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip():
170            cls.remote_ip = opt
171        if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip():
172            cls.remote_port = int(opt)
173        if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip():
174            cls.device_name = opt
175        if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip():
176            cls.tmode = opt
177        if cls.tmode == "usb":
178            ret = cls.get_device()
179            if ret:
180                print("USB device detected.")
181        elif cls.tconn_tcp():
182            cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}"
183        else:
184            print(f"tconn {cls.remote_ip}:{cls.remote_port} failed")
185            return False
186        return True
187
188
189    @classmethod
190    def change_testcase(cls):
191        if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip():
192            cls.changed_testcase = opt
193            if opt == "n":
194                return False
195        if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip():
196            cls.testcase_path = os.path.join(opt)
197        cls.print_options()
198        return True
199
200
201    @classmethod
202    def load_testcase(cls):
203        print("this fuction will coming soon.")
204        return False
205
206
207def rmdir(path):
208    try:
209        if sys.platform == "win32":
210            if os.path.isfile(path):
211                subprocess.call(f"del {path}".split())
212            else:
213                subprocess.call(f"del /Q {path}".split())
214        else:
215            subprocess.call(f"rm -rf {path}".split())
216    except OSError:
217        pass
218
219
220def get_local_path(path):
221    return os.path.join(GP.local_path, path)
222
223
224def get_remote_path(path):
225    return f"{GP.remote_path}/{path}"
226
227
228def _get_local_md5(local):
229    md5_hash = hashlib.md5()
230    with open(local, "rb") as f:
231        for byte_block in iter(lambda: f.read(4096), b""):
232            md5_hash.update(byte_block)
233    return md5_hash.hexdigest()
234
235
236def check_shell(cmd, pattern=None, fetch=False):
237    cmd = f"{GP.hdc_head} {cmd}"
238    print(f"\nexecuting command: {cmd}")
239    if pattern: # check output valid
240        output = subprocess.check_output(cmd.split()).decode()
241        res = pattern in output
242        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
243        return res
244    elif fetch:
245        output = subprocess.check_output(cmd.split()).decode()
246        print(f"--> output: {output}")
247        return output
248    else: # check cmd run successfully
249        return subprocess.check_call(cmd.split()) == 0
250
251
252def _check_file(local, remote):
253    cmd = f"shell md5sum {remote}"
254    local_md5 = _get_local_md5(local)
255    return check_shell(cmd, local_md5)
256
257
258def _check_app_installed(bundle, is_shared=False):
259    dump = "dump-shared" if is_shared else "dump"
260    cmd = f"shell bm {dump} -a"
261    return check_shell(cmd, bundle)
262
263
264def check_hdc_targets():
265    cmd = f"{GP.hdc_head} list targets"
266    print(GP.device_name)
267    return check_shell(cmd, GP.device_name)
268
269
270def check_file_send(local, remote):
271    local_path = os.path.join(GP.local_path, local)
272    remote_path = f"{GP.remote_path}/{remote}"
273    cmd = f"file send {local_path} {remote_path}"
274    return check_shell(cmd) and _check_file(local_path, remote_path)
275
276
277def check_file_recv(remote, local):
278    local_path = os.path.join(GP.local_path, local)
279    remote_path = f"{GP.remote_path}/{remote}"
280    cmd = f"file recv {remote_path} {local_path}"
281    return check_shell(cmd) and _check_file(local_path, remote_path)
282
283
284def check_app_install(app, bundle, args=""):
285    app = os.path.join(GP.local_path, app)
286    install_cmd = f"install {args} {app}"
287    return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args)
288
289
290def check_app_uninstall(bundle, args=""):
291    uninstall_cmd = f"uninstall {args} {bundle}"
292    return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args)
293
294
295def check_hdc_cmd(cmd, pattern=None, **args):
296    if cmd.startswith("file"):
297        if not check_shell(cmd, "FileTransfer finish"):
298            return False
299        if cmd.startswith("file send"):
300            local, remote = cmd.split()[-2:]
301        else:
302            remote, local = cmd.split()[-2:]
303        return _check_file(local, remote)
304
305    elif cmd.startswith("install"):
306        bundle = args.get("bundle", "invalid")
307        opt = " ".join(cmd.split()[1:-1])
308        return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt)
309
310    elif cmd.startswith("uninstall"):
311        bundle = cmd.split()[-1]
312        opt = " ".join(cmd.split()[1:-1])
313        return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt)
314
315    else:
316        return check_shell(cmd, pattern, **args)
317
318
319def switch_usb():
320    res = check_hdc_cmd("tmode usb")
321    time.sleep(3)
322    if res:
323        GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}"
324    return res
325
326
327def switch_tcp():
328    if not GP.remote_ip: # skip tcp check
329        print("!!! remote_ip is none, skip tcp check !!!")
330        return True
331    if GP.remote_ip == "auto":
332        ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True)
333        if not ipconf:
334            print("!!! device ip not found, skip tcp check !!!")
335            return True
336        GP.remote_ip = ipconf.split(":")[1].split()[0]
337        print(f"fetch remote ip: {GP.remote_ip}")
338    ret = check_hdc_cmd(f"tmode port {GP.remote_port}")
339    if ret:
340        time.sleep(3)
341    res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK")
342    if res:
343        GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}"
344    return res
345
346
347def select_cmd():
348    msg = "1) Proceed tester\n" \
349        + "2) Customize tester\n" \
350        + "3) Setup files for transfer\n" \
351        + "4) Load custom testcase(default unused) \n" \
352        + "5) Exit\n" \
353        + ">> "
354
355    while True:
356        opt = input(msg).strip()
357        if len(opt) == 1 and '1' <= opt <= '5':
358            return opt
359
360
361def prepare_source():
362
363    def gen_file(path, size):
364        index = 0
365        path = os.path.abspath(path)
366        fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
367
368        while index < size:
369            os.write(fd, os.urandom(1024))
370            index += 1024
371        os.close(fd)
372
373    print(f"in prepare {GP.local_path},wait for 2 mins.")
374    current_path = os.getcwd()
375    os.mkdir(GP.local_path)
376    print("generating empty file ...")
377    gen_file(os.path.join(GP.local_path, "empty"), 0)
378
379    print("generating small file ...")
380    gen_file(os.path.join(GP.local_path, "small"), 102400)
381
382    print("generating large file ...")
383    gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3)
384
385    print("generating dir with small file ...")
386    dir_path = os.path.join(GP.local_path, "normal_dir")
387    rmdir(dir_path)
388    os.mkdir(dir_path)
389    gen_file(os.path.join(dir_path, "small2"), 102400)
390
391    print("generating empty dir ...")
392    dir_path = os.path.join(GP.local_path, "empty_dir")
393    rmdir(dir_path)
394    os.mkdir(dir_path)
395
396    if os.path.exists("entry-default-signed-debug.hap"):
397        print("copy the hap file to resource dir...")
398        shutil.copy("entry-default-signed-debug.hap", GP.local_path)
399    else:
400        print("No hap File!")
401    if os.path.exists("panalyticshsp-default-signed.hsp"):
402        shutil.copy("panalyticshsp-default-signed.hsp", GP.local_path)
403    else:
404        print("No hsp File!")
405
406
407def setup_tester():
408    while True:
409        GP.print_options()
410        opt = int(select_cmd())
411        if opt == 1:
412            return True
413        elif opt == 2:
414            if not GP.set_options():
415                return False
416            GP.dump()
417        elif opt == 3:
418            prepare_source()
419        elif opt == 4:
420            if not GP.load_testcase():
421                return False
422        elif opt == 5:
423            return False
424        else:
425            return False
426
427
428def load_testcase():
429    if not GP.load_testcase:
430        print("load testcase failed")
431        return False
432    print("load testcase success")
433    return True
434
435
436def check_library_installation(library_name):
437    try:
438        pkg_resources.get_distribution(library_name)
439        return 0
440    except pkg_resources.DistributionNotFound:
441        print(f"\n\n{library_name} is not installed.\n\n")
442        print(f"try to use command below:")
443        print(f"pip install {library_name}")
444        return 1