• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2025 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import os
16import time
17import logging
18import multiprocessing
19import platform
20import pytest
21import tempfile
22
23from utils import GP, check_shell, run_command_with_timeout, get_cmd_block_output, get_shell_result, get_end_symbol, load_gp
24
25logger = logging.getLogger(__name__)
26
27
28class TestSessionLifeCycle:
29
30    def server_run_method(self, cmd):
31        get_cmd_block_output(cmd, timeout=30)
32
33    def remove(self, source_line, target_list):
34        for item in target_list:
35            if source_line.find(item) >= 0:
36                target_list.remove(item)
37                return (True, target_list)
38        return (False, target_list)
39
40    @pytest.mark.L0
41    def test_session_lifecycle_by_reboot(self):
42        check_shell(f"kill")
43        time.sleep(3)
44
45        cmd = f"hdc -m"
46        p = multiprocessing.Process(target=self.server_run_method, args=(cmd,))
47        p.start()
48
49        run_command_with_timeout(f"{GP.hdc_head} wait", 20)
50
51        time.sleep(3)
52        devices = get_shell_result(f"list targets").split(get_end_symbol())
53        target_lines = list()
54        for item in devices:
55            if len(item) > 0:
56                connect_key = f"{item[:3]}******{item[-3:]}(L:{len(item)})"
57                line = f"connectKey:{connect_key} connType:0 connect state:1 faultInfo:"
58                target_lines.append(line)
59                line = f"connectKey:{connect_key} connType:0 connect state:0 faultInfo:LIBUSB_TRANSFER_"
60                target_lines.append(line)
61        time.sleep(3)
62        check_shell(f"shell reboot")
63        time.sleep(5)
64        check_shell(f"kill")
65        is_ohos = "Harmony" in platform.system()
66        # 检查serverOutput是否包含上述字符串
67        if not is_ohos:
68            tmp_path = tempfile.gettempdir()
69        else:
70            tmp_path = os.path.expanduser("~")
71        hdc_log_path = f"{tmp_path}{os.sep}hdc.log"
72        with open(hdc_log_path, 'r') as file:
73            for line in file.readlines():
74                (_ret, _target_lines1) = self.remove(line, target_lines)
75                target_lines = _target_lines1
76        p.join()
77        assert len(target_lines) == 0
78
79    @pytest.mark.L0
80    def test_session_lifecycle_of_multi_tcp(self):
81        check_shell(f"kill")
82        time.sleep(3)
83        p = multiprocessing.Process(target=self.server_run_method, args=(f"hdc -m",))
84        p.start()
85
86        run_command_with_timeout(f"{GP.hdc_head} wait", 20)
87        fport_tcp_port = 27772
88        fport_tcp_host_port = 20001
89        assert check_shell(f"tmode port {fport_tcp_port}", "Set device run mode successful")
90        run_command_with_timeout(f"{GP.hdc_head} wait", 20)
91        for i in range(4):
92            assert check_shell(f"fport tcp:{fport_tcp_host_port + i} tcp:{fport_tcp_port}", "Forwardport result:OK")
93        time.sleep(3)
94        for i in range(4):
95            assert check_shell(f"tconn 127.0.0.1:{fport_tcp_host_port + i}", "Connect OK")
96        devices = get_shell_result(f"list targets").split(get_end_symbol())
97        target_lines = list()
98        for item in devices:
99            if len(item) > 0 and item.find(".") >= 0:
100                connect_key = f"{item[:3]}******{item[-3:]}(L:{len(item)})"
101                line = f"connectKey:{connect_key} connType:1 connect state:1 faultInfo:"
102                target_lines.append(line)
103                line = f"connectKey:{connect_key} connType:1 connect state:0 faultInfo:end of file"
104                if item == f"127.0.0.1:{fport_tcp_host_port + 3}":
105                    line = line + " commandCount:3"
106                target_lines.append(line)
107
108        time.sleep(1)
109        item = f"127.0.0.1:{fport_tcp_host_port + 3}"
110        connect_key = f"{item[:3]}******{item[-3:]}(L:{len(item)})"
111        ls_result_line = f"connectKey:{connect_key} command flag:1001 command parameters:ls command result:1 command take time:"
112        for i in range(3):
113            target_lines.append(ls_result_line)
114            check_shell(f"-t 127.0.0.1:{fport_tcp_host_port + 3} shell ls")
115
116        for i in range(4):
117            assert check_shell(f"fport rm tcp:{fport_tcp_host_port + i} tcp:{fport_tcp_port}",
118                               "Remove forward ruler success")
119        time.sleep(5)
120        check_shell(f"kill")
121        is_ohos = "Harmony" in platform.system()
122        # 检查serverOutput是否包含上述字符串
123        if not is_ohos:
124            tmp_path = tempfile.gettempdir()
125        else:
126            tmp_path = os.path.expanduser("~")
127        hdc_log_path = f"{tmp_path}{os.sep}hdc.log"
128        with open(hdc_log_path, 'r') as file:
129            for line in file.readlines():
130                (_ret, _target_lines1) = self.remove(line, target_lines)
131                target_lines = _target_lines1
132
133        p.join()
134        assert len(target_lines) == 0