• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2025 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import pytest
16from utils import GP, check_shell_any_device, check_hdc_cmd, get_shell_result, load_gp, get_local_path, \
17    get_local_md5, check_unsupport_systems
18from utils import server_loop
19from utils import client_get_file
20import threading
21import os
22
23
24class TestXportCommand:
25    @pytest.mark.L0
26    @check_unsupport_systems(["Harmony"])
27    def test_fport_cmd_output(self):
28        local_port = 18070
29        remote_port = 11080
30        fport_arg = f"tcp:{local_port} tcp:{remote_port}"
31        assert check_hdc_cmd(f"fport {fport_arg}", "Forwardport result:OK")
32        assert check_shell_any_device(f"netstat -an", "LISTEN", False)[0]
33        assert check_shell_any_device(f"netstat -an", f"{local_port}", False)[0]
34        assert check_hdc_cmd(f"fport ls", fport_arg)
35        assert check_hdc_cmd(f"fport rm {fport_arg}", "success")
36
37    @pytest.mark.L0
38    def test_rport_cmd_output(self):
39        local_port = 17090
40        remote_port = 11080
41        rport_arg = f"tcp:{local_port} tcp:{remote_port}"
42        assert check_hdc_cmd(f"rport {rport_arg}", "Forwardport result:OK")
43        netstat_line = get_shell_result(f'shell "netstat -anp | grep {local_port}"')
44        assert "LISTEN" in netstat_line
45        assert "hdcd" in netstat_line
46        fport_list = get_shell_result(f"fport ls")
47        assert "Reverse" in fport_list
48        assert rport_arg in fport_list
49        assert check_hdc_cmd(f"fport rm {rport_arg}", "success")
50
51    @pytest.mark.L0
52    @check_unsupport_systems(["Harmony"])
53    def test_xport_data_comm(self):
54        """
55        test fport rport data communicate.
56
57        1. fport host 17091(listen) -> daemon 11081;
58        2. rport daemon 11081(listen) -> host 18000;
59        3. pc client connect 17091, server listen 18000;
60        4. Communication Link: client -> 17091 host -|- 11081 daemon 11081 -|- host -> 18000 server;
61        """
62        client_connect_port = 17091
63        daemon_transmit_port = 11081
64        server_listen_port = 18000
65
66        # creat transmit env
67        # fport
68        fport_arg = f"tcp:{client_connect_port} tcp:{daemon_transmit_port}"
69        assert check_hdc_cmd(f"fport {fport_arg}", "Forwardport result:OK")
70        assert check_shell_any_device(f"netstat -an", "LISTEN", False)[0]
71        assert check_shell_any_device(f"netstat -an", f"{client_connect_port}", False)[0]
72        assert check_hdc_cmd(f"fport ls", fport_arg)
73
74        # rport
75        rport_arg = f"tcp:{daemon_transmit_port} tcp:{server_listen_port}"
76        assert check_hdc_cmd(f"rport {rport_arg}", "Forwardport result:OK")
77        netstat_line = get_shell_result(f'shell "netstat -anp | grep {daemon_transmit_port}"')
78        assert "LISTEN" in netstat_line
79        assert "hdcd" in netstat_line
80        fport_list = get_shell_result(f"fport ls")
81        assert "Reverse" in fport_list
82        assert rport_arg in fport_list
83
84        # transmit file start
85        file_name = "medium"
86        file_save_name = f"{file_name}_recv_fport"
87        file_path = get_local_path(file_save_name)
88        if os.path.exists(file_path):
89            print(f"client: file {file_path} exist, delete")
90            try:
91                os.remove(file_path)
92            except OSError as error:
93                print(f"delete {file_path} failed: {error.strerror}")
94                assert check_hdc_cmd(f"fport rm {fport_arg}", "success")
95                assert check_hdc_cmd(f"fport rm {rport_arg}", "success")
96                assert False
97
98        server_thread = threading.Thread(target=server_loop, args=(server_listen_port,))
99        server_thread.start()
100
101        client_get_file(client_connect_port, file_name, file_save_name)
102        server_thread.join()
103
104        assert check_hdc_cmd(f"fport rm {fport_arg}", "success")
105        assert check_hdc_cmd(f"fport rm {rport_arg}", "success")
106
107        ori_file_md5 = get_local_md5(get_local_path(file_name))
108        new_file = get_local_path(file_save_name)
109        assert os.path.exists(new_file)
110        new_file_md5 = get_local_md5(new_file)
111        print(f"ori_file_md5:{ori_file_md5}, new_file_md5:{new_file_md5}")
112        assert ori_file_md5 == new_file_md5
113
114    @pytest.mark.L0
115    def test_fport_cmd(self):
116        fport_list = []
117        rport_list = []
118        start_port = 10000
119        end_port = 10020
120        for i in range(start_port, end_port):
121            fport = f"tcp:{i+100} tcp:{i+200}"
122            rport = f"tcp:{i+300} tcp:{i+400}"
123            localabs = f"tcp:{i+500} localabstract:{f'helloworld.com.app.{i+600}'}"
124            fport_list.append(fport)
125            rport_list.append(rport)
126            fport_list.append(localabs)
127
128        for fport in fport_list:
129            assert check_hdc_cmd(f"fport {fport}", "Forwardport result:OK")
130            assert check_hdc_cmd(f"fport {fport}", "TCP Port listen failed at")
131            assert check_hdc_cmd("fport ls", fport)
132
133        for fport in fport_list:
134            assert check_hdc_cmd(f"fport rm {fport}", "success")
135            assert not check_hdc_cmd("fport ls", fport)
136
137        for rport in rport_list:
138            assert check_hdc_cmd(f"rport {rport}", "Forwardport result:OK")
139            assert check_hdc_cmd(f"rport {rport}", "TCP Port listen failed at")
140            assert check_hdc_cmd("rport ls", rport) or check_hdc_cmd("fport ls", rport)
141
142        for rport in rport_list:
143            assert check_hdc_cmd(f"fport rm {rport}", "success")
144            assert not check_hdc_cmd("rport ls", fport) and not check_hdc_cmd("fport ls", fport)
145
146        task_str1 = "tcp:33333 tcp:33333"
147        assert check_hdc_cmd(f"fport {task_str1}", "Forwardport result:OK")
148        assert check_hdc_cmd(f"fport rm {task_str1}", "success")
149        assert check_hdc_cmd(f"fport {task_str1}", "Forwardport result:OK")
150        assert check_hdc_cmd(f"fport rm {task_str1}", "success")
151
152        task_str2 = "tcp:44444 tcp:44444"
153        assert check_hdc_cmd(f"rport {task_str2}", "Forwardport result:OK")
154        assert check_hdc_cmd(f"fport rm {task_str2}", "success")
155        assert check_hdc_cmd(f"rport {task_str2}", "Forwardport result:OK")
156        assert check_hdc_cmd(f"fport rm {task_str2}", "success")
157