• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2024 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# 运行环境: python 3.10+, pytest, pytest-repeat, pytest-testreport
16# 准备文件:package.zip
17# pip install pytest pytest-testreport pytest-repeat
18# python hdc_normal_test.py
19
20
21import argparse
22import time
23import os
24import multiprocessing
25
26import pytest
27
28from dev_hdc_test import GP
29from dev_hdc_test import check_library_installation, check_hdc_version, check_cmd_time
30from dev_hdc_test import check_hdc_cmd, check_hdc_targets, get_local_path, get_remote_path, run_command_with_timeout
31from dev_hdc_test import check_app_install, check_app_uninstall, prepare_source, pytest_run, update_source, check_rate
32from dev_hdc_test import make_multiprocess_file, rmdir, get_shell_result
33from dev_hdc_test import check_app_install_multi, check_app_uninstall_multi
34from dev_hdc_test import check_rom, check_shell, check_shell_any_device, check_cmd_block
35
36
37def test_hdc_server_foreground():
38    port = os.getenv('OHOS_HDC_SERVER_PORT')
39    if port is None:
40        port = 8710
41    assert check_hdc_cmd("kill", "Kill server finish")
42    assert check_cmd_block(f"{GP.hdc_exe} -m", f"port: {port}", timeout=5)
43    assert check_hdc_cmd("start")
44    time.sleep(3) # sleep 3s to wait for the device to connect channel
45
46
47def test_list_targets():
48    assert check_hdc_targets()
49    assert check_hdc_cmd("shell rm -rf data/local/tmp/it_*")
50    assert check_hdc_cmd("shell mkdir data/local/tmp/it_send_dir")
51
52
53def test_usb_disconnect():
54    assert check_hdc_targets()
55    cmd = 'shell "kill -9 `pidof hdcd`"'
56    check_hdc_cmd(f"{cmd}", "[Fail][E001003] USB communication abnormal, please check the USB communication link.")
57    time.sleep(2)
58    assert check_hdc_targets()
59
60
61def test_list_targets_multi_usb_device():
62    devices_str = check_shell_any_device(f"{GP.hdc_exe} list targets", None, True)
63    time.sleep(3) # sleep 3s to wait for the device to connect channel
64    devices_array = devices_str[1].split('\n')
65    if devices_array:
66        for device in devices_array:
67            if len(device) > 8:
68                assert check_shell_any_device(f"{GP.hdc_exe} -t {device} shell id", "u:r:")[0]
69
70
71@pytest.mark.repeat(5)
72def test_empty_file():
73    assert check_hdc_cmd(f"file send {get_local_path('empty')} {get_remote_path('it_empty')}")
74    assert check_hdc_cmd(f"file recv {get_remote_path('it_empty')} {get_local_path('empty_recv')}")
75
76
77@pytest.mark.repeat(5)
78def test_empty_dir():
79    assert check_shell(f"file send {get_local_path('empty_dir')} {get_remote_path('it_empty_dir')}",
80        "the source folder is empty")
81    assert check_hdc_cmd("shell mkdir data/local/tmp/it_empty_dir_recv")
82    assert check_shell(f"file recv {get_remote_path('it_empty_dir_recv')} {get_local_path('empty_dir_recv')}",
83        "the source folder is empty")
84
85
86@pytest.mark.repeat(5)
87def test_long_path():
88    assert check_hdc_cmd(f"file send {get_local_path('deep_test_dir')} {get_remote_path('it_send_dir')}",
89                         is_single_dir=False)
90    assert check_hdc_cmd(f"file recv {get_remote_path('it_send_dir/deep_test_dir')} {get_local_path('recv_test_dir')}",
91                         is_single_dir=False)
92
93
94@pytest.mark.repeat(3)
95def test_no_exist_path():
96    test_resource_list = ['empty', 'medium', 'small', 'problem_dir']
97    remote_unexist_path = f"{get_remote_path('it_no_exist/deep_test_dir/')}"
98    for test_item in test_resource_list:
99        check_hdc_cmd(f"shell rm -rf {get_remote_path('it_no_exist*')}")
100        local_unexist_path = get_local_path(f'{(os.path.join("recv_no_exist", "deep_test_dir", f"recv_{test_item}"))}')
101        if os.path.exists(get_local_path('recv_no_exist')):
102            rmdir(get_local_path('recv_no_exist'))
103        assert check_hdc_cmd(f"file send "
104                             f"{get_local_path(f'{test_item}')} {remote_unexist_path}/it_{test_item}")
105        assert check_hdc_cmd(f"file recv {remote_unexist_path}/it_{test_item} "
106                             f"{local_unexist_path}")
107
108
109@pytest.mark.repeat(3)
110def test_no_exist_path_with_seperate():
111    test_resource_list = ['empty', 'medium', 'small']
112    remote_unexist_path = f"{get_remote_path('it_no_exist/deep_test_dir/')}"
113    for test_item in test_resource_list:
114        check_hdc_cmd(f"shell rm -rf {get_remote_path('it_no_exist*')}")
115        local_unexist_path = get_local_path(f'{(os.path.join("recv_no_exist", "deep_test_dir"))}')
116        if os.path.exists(get_local_path('recv_no_exist')):
117            rmdir(get_local_path('recv_no_exist'))
118        assert check_hdc_cmd(f"file send "
119                             f"{get_local_path(f'{test_item}')} {remote_unexist_path}/")
120        assert check_hdc_cmd(f"file recv {remote_unexist_path}/{test_item} "
121                             f"{local_unexist_path}{os.sep}")
122
123
124@pytest.mark.repeat(3)
125def test_no_exist_bundle_path():
126    # bundle test
127    version = "Ver: 3.1.0e"
128    if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version):
129        print("version does not match, ignore this case")
130    else:
131        data_storage_el2_path = "data/storage/el2/base"
132        remote_unexist_path = f"{data_storage_el2_path}/it_no_exist/deep_test_dir/"
133        check_shell(f"shell mkdir -p mnt/debug/100/debug_hap/{GP.debug_app}/{data_storage_el2_path}")
134        # file send & recv test
135        test_resource_list = ['empty', 'medium', 'small', 'problem_dir']
136        for test_item in test_resource_list:
137            check_hdc_cmd(f"shell -b {GP.debug_app} rm -rf {data_storage_el2_path}/it_no_exist/")
138            local_unexist_path = get_local_path(
139                f'{(os.path.join("recv_no_exist", "deep_test_dir", f"recv_{test_item}"))}')
140            if os.path.exists(get_local_path('recv_no_exist')):
141                rmdir(get_local_path('recv_no_exist'))
142
143            assert check_hdc_cmd(f"shell -b {GP.debug_app} ls {data_storage_el2_path}/it_no_exist",
144                                 "No such file or directory")
145            assert not os.path.exists(get_local_path('recv_no_exist'))
146
147            assert check_hdc_cmd(f"file send -b {GP.debug_app} "
148                                f"{get_local_path(f'{test_item}')} {remote_unexist_path}/it_{test_item}")
149            assert check_hdc_cmd(f"file recv  -b {GP.debug_app} {remote_unexist_path}/it_{test_item} "
150                                f"{local_unexist_path}")
151
152
153@pytest.mark.repeat(5)
154def test_small_file():
155    assert check_hdc_cmd(f"file send {get_local_path('small')} {get_remote_path('it_small')}")
156    assert check_hdc_cmd(f"file recv {get_remote_path('it_small')} {get_local_path('small_recv')}")
157
158
159@pytest.mark.repeat(1)
160def test_node_file():
161    assert check_hdc_cmd(f"file recv {get_remote_path('../../../sys/power/state')} {get_local_path('state')}")
162    assert check_hdc_cmd(f"file recv {get_remote_path('../../../sys/firmware/fdt')} {get_local_path('fdt')}")
163
164
165@pytest.mark.repeat(1)
166def test_medium_file():
167    assert check_hdc_cmd(f"file send {get_local_path('medium')} {get_remote_path('it_medium')}")
168    assert check_hdc_cmd(f"file recv {get_remote_path('it_medium')} {get_local_path('medium_recv')}")
169
170
171@pytest.mark.repeat(1)
172def test_large_file():
173    assert check_hdc_cmd(f"file send {get_local_path('large')} {get_remote_path('it_large')}")
174    assert check_hdc_cmd(f"file recv {get_remote_path('it_large')} {get_local_path('large_recv')}")
175
176
177@pytest.mark.repeat(1)
178def test_running_file():
179    assert check_hdc_cmd(f"file recv system/bin/hdcd {get_local_path('running_recv')}")
180
181
182@pytest.mark.repeat(1)
183def test_rate():
184    assert check_rate(f"file send {get_local_path('large')} {get_remote_path('it_large')}", 18000)
185    assert check_rate(f"file recv {get_remote_path('it_large')} {get_local_path('large_recv')}", 18000)
186
187
188@pytest.mark.repeat(1)
189def test_file_error():
190    assert check_hdc_cmd("target mount")
191    assert check_shell(
192        f"file send {get_local_path('small')} system/bin/hdcd",
193        "busy"
194    )
195    assert check_shell(
196        f"file recv",
197        "[Fail]There is no local and remote path"
198    )
199    assert check_shell(
200        f"file send",
201        "[Fail]There is no local and remote path"
202    )
203    assert check_hdc_cmd(f"shell rm -rf {get_remote_path('../../../large')}")
204    assert check_hdc_cmd(f"shell param set persist.hdc.control.file false")
205    assert check_shell(
206        f"file send {get_local_path('small')} {get_remote_path('it_small_false')}",
207        "debugging is not allowed"
208    )
209    assert check_hdc_cmd(f"shell param set persist.hdc.control.file true")
210    assert check_hdc_cmd(f"file send {get_local_path('small')} {get_remote_path('it_small_true')}")
211
212
213@pytest.mark.repeat(5)
214def test_recv_dir():
215    if os.path.exists(get_local_path('it_problem_dir')):
216        rmdir(get_local_path('it_problem_dir'))
217    assert check_hdc_cmd(f"shell rm -rf {get_remote_path('it_problem_dir')}")
218    assert check_hdc_cmd(f"shell rm -rf {get_remote_path('problem_dir')}")
219    assert make_multiprocess_file(get_local_path('problem_dir'), get_remote_path(''), 'send', 1, "dir")
220    assert check_hdc_cmd(f"shell mv {get_remote_path('problem_dir')} {get_remote_path('it_problem_dir')}")
221    assert make_multiprocess_file(get_local_path(''), get_remote_path('it_problem_dir'), 'recv', 1, "dir")
222
223
224@pytest.mark.repeat(5)
225def test_hap_install():
226    assert check_hdc_cmd(f"install -r {get_local_path('entry-default-signed-debug.hap')}",
227                            bundle="com.hmos.diagnosis")
228
229
230@pytest.mark.repeat(5)
231def test_install_hap():
232    package_hap = "entry-default-signed-debug.hap"
233    app_name_default = "com.hmos.diagnosis"
234
235    # default
236    assert check_app_install(package_hap, app_name_default)
237    assert check_app_uninstall(app_name_default)
238
239    # -r
240    assert check_app_install(package_hap, app_name_default, "-r")
241    assert check_app_uninstall(app_name_default)
242
243    # -k
244    assert check_app_install(package_hap, app_name_default, "-r")
245    assert check_app_uninstall(app_name_default, "-k")
246
247    # -s
248    assert check_app_install(package_hap, app_name_default, "-s")
249
250
251@pytest.mark.repeat(5)
252def test_install_hsp():
253    package_hsp = "libA_v10001.hsp"
254    hsp_name_default = "com.example.liba"
255
256    assert check_app_install(package_hsp, hsp_name_default, "-s")
257    assert check_app_uninstall(hsp_name_default, "-s")
258    assert check_app_install(package_hsp, hsp_name_default)
259
260
261@pytest.mark.repeat(5)
262def test_install_multi_hap():
263    # default multi hap
264    tables = {
265        "entry-default-signed-debug.hap" : "com.hmos.diagnosis",
266        "ActsAudioRecorderJsTest.hap" : "ohos.acts.multimedia.audio.audiorecorder"
267    }
268    assert check_app_install_multi(tables)
269    assert check_app_uninstall_multi(tables)
270    assert check_app_install_multi(tables, "-s")
271
272    # default multi hap -r -k
273    tables = {
274        "entry-default-signed-debug.hap" : "com.hmos.diagnosis",
275        "ActsAudioRecorderJsTest.hap" : "ohos.acts.multimedia.audio.audiorecorder"
276    }
277    assert check_app_install_multi(tables, "-r")
278    assert check_app_uninstall_multi(tables, "-k")
279
280
281@pytest.mark.repeat(5)
282def test_install_multi_hsp():
283    # default multi hsp -s
284    tables = {
285        "libA_v10001.hsp" : "com.example.liba",
286        "libB_v10001.hsp" : "com.example.libb",
287    }
288    assert check_app_install_multi(tables, "-s")
289    assert check_app_uninstall_multi(tables, "-s")
290    assert check_app_install_multi(tables)
291
292
293@pytest.mark.repeat(5)
294def test_install_hsp_and_hap():
295    #default multi hsp and hsp
296    tables = {
297        "libA_v10001.hsp" : "com.example.liba",
298        "entry-default-signed-debug.hap" : "com.hmos.diagnosis",
299    }
300    assert check_app_install_multi(tables)
301    assert check_app_install_multi(tables, "-s")
302
303
304@pytest.mark.repeat(5)
305def test_install_dir():
306    package_haps_dir = "app_dir"
307    app_name_default = "com.hmos.diagnosis"
308    assert check_app_install(package_haps_dir, app_name_default)
309    assert check_app_uninstall(app_name_default)
310
311
312def test_server_kill():
313    assert check_hdc_cmd("kill", "Kill server finish")
314    assert check_hdc_cmd("start server")
315    time.sleep(3) # sleep 3s to wait for the device to connect channel
316    assert check_hdc_cmd("checkserver", "Ver")
317
318
319def test_target_cmd():
320    assert check_hdc_targets()
321    check_hdc_cmd("target boot")
322    start_time = time.time()
323    run_command_with_timeout(f"{GP.hdc_head} wait", 30) # reboot takes up to 30 seconds
324    time.sleep(3) # sleep 3s to wait for the device to boot
325    run_command_with_timeout(f"{GP.hdc_head} wait", 30) # reboot takes up to 30 seconds
326    end_time = time.time()
327    print(f"command exec time {end_time - start_time}")
328    time.sleep(3) # sleep 3s to wait for the device to connect channel
329    assert (end_time - start_time) > 8 # Reboot takes at least 8 seconds
330
331
332@pytest.mark.repeat(1)
333def test_file_switch_off():
334    assert check_hdc_cmd("shell param set persist.hdc.control.file false")
335    assert check_shell(f"shell param get persist.hdc.control.file", "false")
336    assert check_shell(f"file send {get_local_path('small')} {get_remote_path('it_small')}",
337                       "debugging is not allowed")
338    assert check_shell(f"file recv {get_remote_path('it_small')} {get_local_path('small_recv')}",
339                       "debugging is not allowed")
340
341
342@pytest.mark.repeat(1)
343def test_file_switch_on():
344    assert check_hdc_cmd("shell param set persist.hdc.control.file true")
345    assert check_shell(f"shell param get persist.hdc.control.file", "true")
346    assert check_hdc_cmd(f"file send {get_local_path('small')} {get_remote_path('it_small')}")
347    assert check_hdc_cmd(f"file recv {get_remote_path('it_small')} {get_local_path('small_recv')}")
348
349
350def test_target_mount():
351    assert (check_hdc_cmd("target mount", "Mount finish" or "[Fail]Operate need running as root"))
352    remount_vendor = get_shell_result(f'shell "mount |grep /vendor |head -1"')
353    print(remount_vendor)
354    assert "rw" in remount_vendor
355    remount_system = get_shell_result(f'shell "cat proc/mounts | grep /system |head -1"')
356    print(remount_system)
357    assert "rw" in remount_system
358
359
360def test_tmode_port():
361    assert (check_hdc_cmd("tmode port", "Set device run mode successful"))
362    time.sleep(3) # sleep 3s to wait for the device to connect channel
363    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
364    time.sleep(3) # sleep 3s to wait for the device to connect channel
365    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
366    assert (check_hdc_cmd("tmode port 12345"))
367    time.sleep(3) # sleep 3s to wait for the device to connect channel
368    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
369    time.sleep(3) # sleep 3s to wait for the device to connect channel
370    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
371    netstat_port = get_shell_result(f'shell "netstat -anp | grep 12345"')
372    print(netstat_port)
373    assert "LISTEN" in netstat_port
374    assert "hdcd" in netstat_port
375
376
377def test_tconn():
378    daemon_port = 58710
379    address = "127.0.0.1"
380    assert (check_hdc_cmd(f"tmode port {daemon_port}"))
381    time.sleep(3) # sleep 3s to wait for the device to connect channel
382    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
383    time.sleep(3) # sleep 3s to wait for the device to connect channel
384    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
385    assert check_hdc_cmd(f"shell param get persist.hdc.port", f"{daemon_port}")
386    assert check_hdc_cmd(f"fport tcp:{daemon_port} tcp:{daemon_port}", "Forwardport result:OK")
387    assert check_hdc_cmd(f"fport ls ", f"tcp:{daemon_port} tcp:{daemon_port}")
388    assert check_shell(f"tconn {address}:{daemon_port}", "Connect OK", head=GP.hdc_exe)
389    time.sleep(3)
390    assert check_shell("list targets", f"{address}:{daemon_port}", head=GP.hdc_exe)
391    tcp_head = f"{GP.hdc_exe} -t {address}:{daemon_port}"
392    assert check_hdc_cmd(f"file send {get_local_path('medium')} {get_remote_path('it_tcp_medium')}",
393        head=tcp_head)
394    assert check_hdc_cmd(f"file recv {get_remote_path('it_tcp_medium')} {get_local_path('medium_tcp_recv')}",
395        head=tcp_head)
396    assert check_shell(f"tconn {address}:{daemon_port} -remove", head=GP.hdc_exe)
397    assert not check_shell("list targets", f"{address}:{daemon_port}", head=GP.hdc_exe)
398    assert check_hdc_cmd(f"fport rm tcp:{daemon_port} tcp:{daemon_port}", "Remove forward ruler success")
399    assert not check_hdc_cmd(f"fport ls ", f"tcp:{daemon_port} tcp:{daemon_port}")
400
401
402def test_target_key():
403    device_key = get_shell_result(f"list targets").split("\r\n")[0]
404    hdcd_pid = get_shell_result(f"-t {device_key} shell pgrep -x hdcd").split("\r\n")[0]
405    assert hdcd_pid.isdigit()
406
407
408def test_version_cmd():
409    version = "Ver: 2.0.0a"
410    assert check_hdc_version("-v", version)
411    assert check_hdc_version("version", version)
412    assert check_hdc_version("checkserver", version)
413
414
415def test_fport_cmd_output():
416    local_port = 18070
417    remote_port = 11080
418    fport_arg = f"tcp:{local_port} tcp:{remote_port}"
419    assert check_hdc_cmd(f"fport {fport_arg}", "Forwardport result:OK")
420    assert check_shell_any_device(f"netstat -ano", "LISTENING", False)[0]
421    assert check_shell_any_device(f"netstat -ano", f"{local_port}", False)[0]
422    assert check_hdc_cmd(f"fport ls", fport_arg)
423    assert check_hdc_cmd(f"fport rm {fport_arg}", "success")
424
425
426def test_rport_cmd_output():
427    local_port = 17090
428    remote_port = 11080
429    rport_arg = f"tcp:{local_port} tcp:{remote_port}"
430    assert check_hdc_cmd(f"rport {rport_arg}", "Forwardport result:OK")
431    netstat_line = get_shell_result(f'shell "netstat -anp | grep {local_port}"')
432    assert "LISTEN" in netstat_line
433    assert "hdcd" in netstat_line
434    fport_list = get_shell_result(f"fport ls")
435    assert "Reverse" in fport_list
436    assert rport_arg in fport_list
437    assert check_hdc_cmd(f"fport rm {rport_arg}", "success")
438
439
440def test_fport_cmd():
441    fport_list = []
442    rport_list = []
443    start_port = 10000
444    end_port = 10020
445    for i in range(start_port, end_port):
446        fport = f"tcp:{i+100} tcp:{i+200}"
447        rport = f"tcp:{i+300} tcp:{i+400}"
448        localabs = f"tcp:{i+500} localabstract:{f'helloworld.com.app.{i+600}'}"
449        fport_list.append(fport)
450        rport_list.append(rport)
451        fport_list.append(localabs)
452
453    for fport in fport_list:
454        assert check_hdc_cmd(f"fport {fport}", "Forwardport result:OK")
455        assert check_hdc_cmd(f"fport {fport}", "TCP Port listen failed at")
456        assert check_hdc_cmd("fport ls", fport)
457
458    for fport in fport_list:
459        assert check_hdc_cmd(f"fport rm {fport}", "success")
460        assert not check_hdc_cmd("fport ls", fport)
461
462    for rport in rport_list:
463        assert check_hdc_cmd(f"rport {rport}", "Forwardport result:OK")
464        assert check_hdc_cmd(f"rport {rport}", "TCP Port listen failed at")
465        assert check_hdc_cmd("rport ls", rport) or check_hdc_cmd("fport ls", rport)
466
467    for rport in rport_list:
468        assert check_hdc_cmd(f"fport rm {rport}", "success")
469        assert not check_hdc_cmd("rport ls", fport) and not check_hdc_cmd("fport ls", fport)
470
471    task_str1 = "tcp:33333 tcp:33333"
472    assert check_hdc_cmd(f"fport {task_str1}", "Forwardport result:OK")
473    assert check_hdc_cmd(f"fport rm {task_str1}", "success")
474    assert check_hdc_cmd(f"fport {task_str1}", "Forwardport result:OK")
475    assert check_hdc_cmd(f"fport rm {task_str1}", "success")
476
477    task_str2 = "tcp:44444 tcp:44444"
478    assert check_hdc_cmd(f"rport {task_str2}", "Forwardport result:OK")
479    assert check_hdc_cmd(f"fport rm {task_str2}", "success")
480    assert check_hdc_cmd(f"rport {task_str2}", "Forwardport result:OK")
481    assert check_hdc_cmd(f"fport rm {task_str2}", "success")
482
483
484#子进程执行函数
485def new_process_run(cmd):
486    # 重定向 stdout 和 stderr 到 /dev/null
487    with open(os.devnull, 'w') as devnull:
488        old_stdout = os.dup2(devnull.fileno(), 1)  # 重定向 stdout
489        old_stderr = os.dup2(devnull.fileno(), 2)  # 重定向 stderr
490        try:
491            # 这里是子进程的代码,不会有任何输出到控制台
492            check_shell(f'{cmd}')
493        finally:
494            # 恢复原始的 stdout 和 stderr
495            os.dup2(old_stdout, 1)
496            os.dup2(old_stderr, 2)
497
498
499def test_hilog_exit_after_hdc_kill():
500    # 新开进程执行hdc shell hilog,防止阻塞主进程
501    p = multiprocessing.Process(target=new_process_run, args=("shell hilog",))
502    p.start()
503    time.sleep(1)
504    hilog_pid = get_shell_result(f'shell pidof hilog')
505    hilog_pid = hilog_pid.replace("\r\n", "")
506    assert hilog_pid.isdigit()
507    assert check_hdc_cmd(f'kill', "Kill server finish")
508    assert check_hdc_cmd("start")
509    time.sleep(3) # sleep 3s to wait for the device to connect channel
510    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
511    hilog_pid2 = get_shell_result(f'shell pidof hilog')
512    assert hilog_pid2 == ''
513    p.join()
514
515
516def test_shell_cmd_timecost():
517    assert check_cmd_time(
518        cmd="shell \"ps -ef | grep hdcd\"",
519        pattern="hdcd",
520        duration=None,
521        times=10)
522
523
524def test_shell_huge_cat():
525    assert check_hdc_cmd(f"file send {get_local_path('word_100M.txt')} {get_remote_path('it_word_100M.txt')}")
526    assert check_cmd_time(
527        cmd=f"shell cat {get_remote_path('it_word_100M.txt')}",
528        pattern=None,
529        duration=10000, # 10 seconds
530        times=10)
531
532
533def test_file_option_bundle_normal():
534    version = "Ver: 3.1.0e"
535    if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version):
536        print("version does not match, ignore this case")
537    else:
538        data_storage_el2_path = "data/storage/el2/base"
539        check_shell(f"shell rm -rf mnt/debug/100/debug_hap/{GP.debug_app}/{data_storage_el2_path}/it*")
540        check_shell(f"shell mkdir -p mnt/debug/100/debug_hap/{GP.debug_app}/{data_storage_el2_path}")
541
542        # file send & recv test
543        test_resource_list = ['empty', 'medium', 'small', 'problem_dir']
544        for test_item in test_resource_list:
545            if test_item == 'problem_dir' and os.path.exists(get_local_path(f'recv_bundle_{test_item}')):
546                rmdir(get_local_path(f'recv_bundle_{test_item}'))
547
548            assert check_hdc_cmd(f"file send -b {GP.debug_app} "
549                                 f"{get_local_path(f'{test_item}')} {data_storage_el2_path}/it_{test_item}")
550            assert check_hdc_cmd(f"file recv -b {GP.debug_app} {data_storage_el2_path}/it_{test_item} "
551                                 f"{get_local_path(f'recv_bundle_{test_item}')} ")
552
553
554def test_file_option_bundle_error():
555    version = "Ver: 3.1.0e"
556    if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version):
557        print("version does not match, ignore this case")
558    else:
559        data_storage_el2_path = "data/storage/el2/base"
560        is_user = check_shell("shell id", "uid=2000(shell)")
561        assert check_shell(f"file send -b {GP.debug_app} "
562                           f"{get_local_path('empty_dir')} {get_remote_path('it_empty_dir')}",
563                           "the source folder is empty")
564
565        assert check_shell(f"file send -b {GP.debug_app} ", "There is no local and remote path")
566        assert check_shell(f"file recv -b {GP.debug_app} ", "There is no local and remote path")
567
568        # 报错优先级
569        assert check_shell(f"file send -b ./{GP.debug_app} ", "There is no local and remote path")
570        assert check_shell(f"file recv -b ./{GP.debug_app} ", "There is no local and remote path")
571
572        # 报错优先级
573        assert check_shell(f"file recv -b ", "[E005003]")
574        assert check_shell(f"file send -b ", "[E005003]")
575
576        assert check_shell(f"file send -b ./{GP.debug_app} "
577                    f"{get_local_path('small')} {get_remote_path('it_small')}", "[E005101]")
578        assert check_shell(f"file recv -b ./{GP.debug_app} "
579                    f"{get_local_path('small')} {get_remote_path('it_small')}", "[E005101]")
580
581        # bundle不存在或不符合要求
582        assert check_shell(f"file send -b ./{GP.debug_app} "
583                           f"{get_local_path('small')} {get_remote_path('it_small')}", "[E005101]")
584        assert check_shell(f"file send -b com.AAAA "
585                           f"{get_local_path('small')} {get_remote_path('it_small')}", "[E005101]")
586        assert check_shell(f"file send -b 1 "
587                           f"{get_local_path('small')} {get_remote_path('it_small')}", "[E005101]")
588        assert check_shell(f"file send -b "
589                           f"{get_local_path('small')} {get_remote_path('it_small')}", "There is no remote path")
590        assert check_shell(f"file recv -b ./{GP.debug_app} "
591                           f"{get_remote_path('it_small')} {get_local_path('small_recv')}", "[E005101]")
592        # 逃逸
593        assert check_shell(f"file send -b {GP.debug_app} "
594                           f"{get_local_path('small')} ../../../it_small", "[E005102]")
595        assert check_shell(f"file recv -b {GP.debug_app} ../../../it_small"
596                           f"{get_local_path('small_recv')} ", "[E005102]")
597        assert check_shell(f"file send -b {GP.debug_app} "
598                           f"{get_local_path('small')} {data_storage_el2_path}/../../../../../ ",
599                           "permission denied" if is_user else "[E005102]")
600
601
602def test_file_option_bundle_error_bugfix():
603    version = "Ver: 3.1.0e"
604    if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version):
605        print("version does not match, ignore this case")
606    else:
607        data_storage_el2_path = "data/storage/el2/base"
608        check_shell(f"shell rm -rf mnt/debug/100/debug_hap/{GP.debug_app}_extra/{data_storage_el2_path}/*")
609        check_shell(f"shell mkdir -p mnt/debug/100/debug_hap/{GP.debug_app}_extra/{data_storage_el2_path}/")
610        outside_error = "[E005102]"
611        # bundle根目录 逃逸
612        for remote_fail_path in ["..", "../..", "../../..", "../../../..",
613                                 "../.", "./..", "../",
614                                 f"../{GP.debug_app}_extra/{data_storage_el2_path}/",
615                                 f"../{GP.debug_app}_notexsit/{data_storage_el2_path}/",
616                                 f"../{GP.debug_app}_notexsit/",
617                                 f"./../../../../../../../../../../aa"]:
618            assert check_shell(f"file send -b {GP.debug_app} "
619                           f"{get_local_path('small')} {remote_fail_path} ", outside_error)
620
621        # bundle根目录 未逃逸
622        for remote_ok_path in ['.', './', './.', '...',
623                               f"../../../../../../../../../../../mnt/debug/100/debug_hap/{GP.debug_app}/"]:
624            assert not check_shell(f"file send -b {GP.debug_app} "
625                            f"{get_local_path('small')} {remote_ok_path}", outside_error)
626
627
628def test_shell_option_bundle_normal():
629    version = "Ver: 3.1.0e"
630    if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version):
631        print("version does not match, ignore this case")
632    else:
633        data_storage_el2_path = "data/storage/el2/base"
634        check_shell(f"shell mkdir -p mnt/debug/100/debug_hap/{GP.debug_app}/{data_storage_el2_path}")
635        assert check_shell(f"shell -b {GP.debug_app} pwd", f"mnt/debug/100/debug_hap/{GP.debug_app}")
636        assert check_shell(f"shell -b {GP.debug_app} cd {data_storage_el2_path}; pwd",
637                           f"mnt/debug/100/debug_hap/{GP.debug_app}/{data_storage_el2_path}")
638        check_shell(f"shell -b {GP.debug_app} touch {data_storage_el2_path}/test01")
639        assert not check_shell(f"shell -b {GP.debug_app} touch {data_storage_el2_path}/test01", "denied")
640        assert not check_shell(f"shell -b {GP.debug_app} touch -a {data_storage_el2_path}/test01", "denied")
641        assert check_shell(f"shell -b {GP.debug_app} ls {data_storage_el2_path}/", "test01")
642        assert check_shell(f"shell           -b            {GP.debug_app} echo            123             ",
643                    "123")
644        check_shell(f"shell -b {GP.debug_app} \"echo 123 > {data_storage_el2_path}/test02\"")
645        assert check_shell(f"shell -b {GP.debug_app} cat {data_storage_el2_path}/test02", "123")
646        check_shell(f"shell -b {GP.debug_app} mkdir {data_storage_el2_path}/test03")
647        assert check_shell(f"shell -b {GP.debug_app} stat {data_storage_el2_path}/test03", "Access")
648        check_shell(f"shell -b {GP.debug_app} rm -rf {data_storage_el2_path}/test01 "
649                    f"{data_storage_el2_path}/test02 {data_storage_el2_path}/test03")
650        assert check_shell(f"shell -b {GP.debug_app} ls {data_storage_el2_path}/test01",
651                           "test01: No such file or directory")
652        assert check_shell(f"shell -b {GP.debug_app} ls {data_storage_el2_path}/test02",
653                           "test02: No such file or directory")
654        assert check_shell(f"shell -b {GP.debug_app} ls {data_storage_el2_path}/test03",
655                           "test03: No such file or directory")
656
657
658def test_shell_option_bundle_error():
659    version = "Ver: 3.1.0e"
660    if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version):
661        print("version does not match, ignore this case")
662    else:
663        # 不存在的bundle名称
664        assert check_shell("shell -b com.XXXX.not.exist.app pwd", "[Fail][E003001]")
665        # 相对路径逃逸1
666        assert check_shell(f"shell -b ../../../../ pwd", "[Fail][E003001")
667        # 相对路径逃逸2
668        assert check_shell(f"shell -b ././././pwd", "[Fail][E003001]")
669        # 相对路径逃逸3
670        assert check_shell(f"shell -b / pwd", "[Fail][E003001]")
671        # 交互式shell不支持
672        assert check_shell(f"shell -b {GP.debug_app}", "[Fail][E003002]")
673        # bundle参数(129)超过128
674        str_len_129 = "a" * 129
675        assert check_shell(f"shell -b {str_len_129} pwd", "[Fail][E003001]")
676        # bundle参数(6)低于7
677        str_len_6 = "a" * 6
678        assert check_shell(f"shell -b {str_len_6} pwd", "[Fail][E003001]")
679        # bundle参数存在非法参数
680        str_invalid = "#########"
681        assert check_shell(f"shell -b {str_invalid} pwd", "[Fail][E003001]")
682        # 其他参数用例1
683        assert check_shell("shell -param 1234567890 pwd", "[Fail][E003003]")
684        # 其他参数用例2
685        assert check_shell("shell -basd {GP.debug_app} pwd", "[Fail][E003003]")
686        # 缺少参数字母
687        assert check_shell(f"shell - {GP.debug_app} ls", "[Fail][E003003]")
688        # 缺少bundle参数1
689        assert check_shell("shell -b ls", "[Fail][E003001]")
690        # 缺少bundle参数2
691        assert check_shell("shell -b", "[Fail][E003005]")
692        # 存在未知参数在前面
693        assert check_shell(f"shell -t -b {GP.debug_app} ls", "[Fail][E003003]")
694        # 参数在后面
695        assert check_shell(f"shell ls -b {GP.debug_app}", "No such file or directory")
696        # 双倍参数
697        assert check_shell(f"shell -b -b {GP.debug_app} ls", "[Fail][E003001]")
698        # 双倍-杠
699        assert check_shell(f"shell --b {GP.debug_app}", "[Fail][E003003]")
700
701
702def test_option_bundle_error_usage_support():
703    version = "Ver: 3.1.0e"
704    if not check_hdc_version("version", version) or not check_hdc_version("shell hdcd -v", version):
705        print("version does not match, ignore this case")
706    else:
707        assert check_shell("shell -b", "Usage: hdc shell [-b bundlename] [COMMAND...]")
708        assert check_shell(f"file recv -b ", "Usage: hdc file recv [-b bundlename] remote local")
709        assert check_shell(f"file send -b ", "Usage: hdc file send [-b bundlename] local remote")
710
711
712def test_hdcd_rom():
713    baseline = 2200 # 2200KB
714    assert check_rom(baseline)
715
716
717def test_smode_permission():
718    check_shell(f"shell rm -rf data/deep_test_dir")
719    assert check_hdc_cmd(f'smode -r')
720    time.sleep(3)
721    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
722    assert check_shell(f"file send {get_local_path('deep_test_dir')} data/",
723                       "permission denied")
724    assert check_shell(f"file send {get_local_path('deep_test_dir')} data/",
725                       "[E005005]")
726
727
728def test_smode_r():
729    assert check_hdc_cmd(f'smode -r')
730    time.sleep(3) # sleep 3s to wait for the device to connect channel
731    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
732    time.sleep(3) # sleep 3s to wait for the device to connect channel
733    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
734    assert check_shell(f"shell id", "context=u:r:sh:s0")
735    assert check_shell(f"file send {get_local_path('deep_test_dir')} data/",
736                       "permission denied")
737
738
739def test_smode():
740    assert check_hdc_cmd(f'smode')
741    time.sleep(3) # sleep 3s to wait for the device to connect channel
742    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
743    time.sleep(3) # sleep 3s to wait for the device to connect channel
744    run_command_with_timeout(f"{GP.hdc_head} wait", 3) # wait 3s for the device to connect channel
745    assert check_shell(f"shell id", "context=u:r:su:s0")
746    assert not check_hdc_cmd("ls /data/log/faultlog/faultlogger | grep hdcd", "hdcd")
747
748
749def test_persist_hdc_mode_tcp():
750    assert check_hdc_cmd(f"shell param set persist.hdc.mode.tcp enable")
751    time.sleep(5)
752    run_command_with_timeout("hdc wait", 3)
753    netstat_listen = get_shell_result(f'shell "netstat -anp | grep tcp | grep hdcd"')
754    assert "LISTEN" in netstat_listen
755    assert "hdcd" in netstat_listen
756    assert check_hdc_cmd(f"shell param set persist.hdc.mode.tcp enable")
757    time.sleep(5)
758    run_command_with_timeout("hdc wait", 3)
759    netstat_listen = get_shell_result(f'shell "netstat -anp | grep tcp | grep hdcd"')
760    assert "LISTEN" in netstat_listen
761    assert "hdcd" in netstat_listen
762    assert check_hdc_cmd(f"shell param set persist.hdc.mode.tcp disable")
763    time.sleep(5)
764    run_command_with_timeout("hdc wait", 3)
765    netstat_listen = get_shell_result(f'shell "netstat -anp | grep tcp | grep hdcd"')
766    assert "LISTEN" not in netstat_listen
767    assert "hdcd" not in netstat_listen
768
769
770def test_persist_hdc_mode_usb():
771    assert check_hdc_cmd(f"shell param set persist.hdc.mode.usb enable")
772    echo_result = get_shell_result(f'shell "echo 12345"')
773    assert "12345" not in echo_result
774    time.sleep(10)
775    run_command_with_timeout("hdc wait", 3)
776    echo_result = get_shell_result(f'shell "echo 12345"')
777    assert "12345" in echo_result
778
779
780def test_persist_hdc_mode_tcp_usb():
781    assert check_hdc_cmd(f"shell param set persist.hdc.mode.tcp enable")
782    time.sleep(5)
783    run_command_with_timeout("hdc wait", 3)
784    assert check_hdc_cmd(f"shell param set persist.hdc.mode.usb enable")
785    time.sleep(10)
786    run_command_with_timeout("hdc wait", 3)
787    netstat_listen = get_shell_result(f'shell "netstat -anp | grep tcp | grep hdcd"')
788    assert "LISTEN" in netstat_listen
789    assert "hdcd" in netstat_listen
790    assert check_hdc_cmd(f"shell param set persist.hdc.mode.tcp disable")
791    time.sleep(5)
792    run_command_with_timeout("hdc wait", 3)
793    netstat_listen = get_shell_result(f'shell "netstat -anp | grep tcp | grep hdcd"')
794    assert "LISTEN" not in netstat_listen
795    assert "hdcd" not in netstat_listen
796
797
798
799def setup_class():
800    print("setting up env ...")
801    check_hdc_cmd("shell rm -rf data/local/tmp/it_*")
802    GP.load()
803
804
805def teardown_class():
806    pass
807
808
809def run_main():
810    if check_library_installation("pytest"):
811        exit(1)
812
813    if check_library_installation("pytest-testreport"):
814        exit(1)
815
816    if check_library_installation("pytest-repeat"):
817        exit(1)
818
819    GP.init()
820
821    prepare_source()
822    update_source()
823
824    choice_default = ""
825    parser = argparse.ArgumentParser()
826    parser.add_argument('--count', type=int, default=1,
827                        help='test times')
828    parser.add_argument('--verbose', '-v', default=__file__,
829                        help='filename')
830    parser.add_argument('--desc', '-d', default='Test for function.',
831                        help='Add description on report')
832    args = parser.parse_args()
833
834    pytest_run(args)
835
836
837if __name__ == "__main__":
838    run_main()
839