• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2025 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import os
16import stat
17import threading
18import time
19import logging
20import pytest
21import multiprocessing
22
23from utils import GP, run_command_with_timeout, get_shell_result, \
24    check_shell, check_version, get_local_path, rmdir, load_gp
25
26SEP = "/"
27MOUNT_POINT = "storage"
28TEST_FILE_SIZE = 20  # 20KB
29SEND_FILE_PROCESS_COUNT = 25
30TEST_FILE_CASE_TABLE = [
31    (False, False, True),
32    (False, False, False),
33    (False, True, True),
34    (False, True, False),
35    (True, False, True),
36    (True, False, False),
37    (True, True, True),
38    (True, True, False),
39]
40
41logger = logging.getLogger(__name__)
42
43
44def create_test_file(file_path, size, random=False):
45    flags = os.O_CREAT | os.O_WRONLY | os.O_EXCL
46    modes = stat.S_IWUSR | stat.S_IRUSR
47    with os.fdopen(os.open(file_path, flags, modes), 'wb') as f:
48        if random:
49            f.write(os.urandom(size * 1024))  # 100KB
50        else:
51            f.seek(size * 1024 - 1)  # 移动到文件末尾
52            f.write(b'\xff')  # 写入一个零字节
53
54
55def create_binary_tree(depth, path='.', size=TEST_FILE_SIZE, random=False):
56    if depth == 0:
57        create_test_file(os.path.join(path, f'{size}KB.1.bin'), size, random=random)
58        create_test_file(os.path.join(path, f'{size}KB.2.bin'), size, random=random)
59    else:
60        # 创建左右子目录
61        left_path = os.path.join(path, '1')
62        right_path = os.path.join(path, '2')
63        os.makedirs(left_path, exist_ok=True)
64        os.makedirs(right_path, exist_ok=True)
65
66        # 递归创建下一层目录
67        create_binary_tree(depth - 1, left_path, size)
68        create_binary_tree(depth - 1, right_path, size)
69
70
71class TestFileNoSpaceFdLeak:
72    """
73    直接填满磁盘空间进行文件传输,传输后查询fd泄露状态。
74    """
75    fd_count = 0
76
77    @staticmethod
78    def send_file_to_storage(is_compress=False, is_directory=False, is_zero=False, is_mix=False):
79        compress_command = "-z" if is_compress else ""
80        single_file_name = "medium" if not is_zero else "word_100M.txt"
81        single_dir_name = "tree_rand" if not is_zero else "tree_zero"
82        local_path = get_local_path(single_dir_name) if is_directory else get_local_path(single_file_name)
83        target_path = "it_nospace" if is_directory else "it_nospace.bin"
84        if is_mix:
85            local_path = get_local_path(".")
86            target_path = "it_nospace_mix"
87        re_send_time = 10
88        for i in range(1, re_send_time + 1):
89            logger.info("send %d times", i)
90            output_str, error_str = run_command_with_timeout(f"{GP.hdc_head} "
91                                                             f"file send {compress_command} {local_path} {SEP}{MOUNT_POINT}/{target_path}_{i}",
92                                                             25)
93            if "Command timed out" in error_str:
94                logger.warning("error_str: %s", error_str)
95                return False
96            if "space left on device" not in output_str:
97                logger.warning(f"output_str: %s", output_str)
98                return False
99        return True
100
101    @staticmethod
102    def teardown_class():
103        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
104        rmdir(get_local_path("tree_rand"))
105        rmdir(get_local_path("tree_zero"))
106
107    def setup_class(self):
108        depth = 10  # 目录深度
109        if not os.path.exists(get_local_path("tree_rand")):
110            create_binary_tree(depth, get_local_path("tree_rand"), random=True)
111        if not os.path.exists(get_local_path("tree_zero")):
112            create_binary_tree(depth, get_local_path("tree_zero"), random=False)
113        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
114        assert check_shell(f"shell dd if={SEP}dev/zero bs=500M count=24 of={SEP}storage/it_full.img",
115                           "space left on device")
116        time.sleep(1)
117        pid = get_shell_result(f"shell pidof hdcd").split("\r")[0]
118        self.fd_count = get_shell_result(f"shell ls {SEP}proc/{pid}/fd | wc -l")
119
120    @pytest.mark.L2
121    @check_version("Ver: 3.1.0e")
122    @pytest.mark.parametrize("is_compress, is_directory, is_zero", TEST_FILE_CASE_TABLE,
123                             ids=[f"is_compress:{is_compress}, is_directory:{is_directory}, is_zero:{is_zero}"
124                                  for is_compress, is_directory, is_zero in TEST_FILE_CASE_TABLE])
125    def test_file_normal(self, is_compress, is_directory, is_zero):
126        assert self.send_file_to_storage(is_compress=is_compress, is_directory=is_directory, is_zero=is_zero)
127
128    @pytest.mark.L2
129    @check_version("Ver: 3.1.0e")
130    @pytest.mark.parametrize("is_compress", [True, False], ids=["is_compress:True", "is_compress:False"])
131    def test_file_mix(self, is_compress):
132        assert self.send_file_to_storage(is_compress=is_compress, is_mix=True)
133
134    @pytest.mark.L2
135    @check_version("Ver: 3.1.0e")
136    def test_file_fd_leak_proc(self):
137        assert not check_shell(f"shell ls -al {SEP}proc/`pidof hdcd`/fd", "it_nospace")
138
139    @pytest.mark.L2
140    @check_version("Ver: 3.1.0e")
141    def test_file_fd_count(self):
142        time.sleep(1)
143        pid = get_shell_result(f"shell pidof hdcd").split("\r")[0]
144        assert get_shell_result(f"shell ls {SEP}proc/{pid}/fd | wc -l") <= self.fd_count
145
146
147class TestFileReFullSpaceFdLeak:
148    """
149    磁盘空间接近满情况,进行文件传输,不断地删除并重传,传输后查询fd泄露状态。
150    """
151    stop_flag = threading.Event()
152    fd_count = 0
153
154    @staticmethod
155    def teardown_class(self):
156        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
157        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/gen_*")
158        rmdir(get_local_path("tree_rand"))
159        rmdir(get_local_path("tree_zero"))
160
161    def re_create_file(self, num=600):
162        for i in range(1, num):
163            check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*;"
164                        f" dd if={SEP}dev/zero bs=1M count=10240 of="
165                        f"{SEP}storage/it_full.img")
166            logger.info("re create file count:%d", i)
167            if self.stop_flag.is_set():
168                break
169
170    def re_send_file(self, is_compress=False, is_directory=False, is_zero=False, is_mix=False):
171        re_create_file_thread = threading.Thread(target=self.re_create_file)
172        re_create_file_thread.start()
173        result = True
174        compress_command = "-z" if is_compress else ""
175        single_file_name = "medium" if not is_zero else "word_100M.txt"
176        single_dir_name = "tree_rand" if not is_zero else "tree_zero"
177        local_path = get_local_path(single_dir_name) if is_directory else get_local_path(single_file_name)
178        target_path = "it_nospace" if is_directory else "it_nospace.bin"
179        if is_mix:
180            local_path = get_local_path('.')
181            target_path = "it_nospace_mix"
182        re_send_time = 10
183        for i in range(1, re_send_time + 1):
184            output_str, error_str = run_command_with_timeout(f"{GP.hdc_head} "
185                                                             f"file send {compress_command} {local_path} {SEP}{MOUNT_POINT}/{target_path}_{i}",
186                                                             25)
187            logger.info("output:%s,error:%s", output_str, error_str)
188            if "Command timed out" in error_str:
189                logger.warning("Command timed out")
190                result = False
191                break
192            if "Transfer Stop" not in output_str:
193                logger.warning("Transfer Stop NOT FOUNT")
194                result = False
195                break
196        self.stop_flag.set()
197        re_create_file_thread.join()
198        return result
199
200    def setup_class(self):
201        depth = 10  # 目录深度
202        if not os.path.exists(get_local_path("tree_rand")):
203            create_binary_tree(depth, get_local_path("tree_rand"), random=True)
204        if not os.path.exists(get_local_path("tree_zero")):
205            create_binary_tree(depth, get_local_path("tree_zero"), random=False)
206        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
207        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/gen_*")
208        storage_size = get_shell_result(
209            f"shell \"df {SEP}{MOUNT_POINT} | grep {MOUNT_POINT}\"").split()[3]
210        logger.info("storage size =%s", storage_size)
211        assert int(storage_size) >= 10
212        gen_size = int(storage_size) - 10
213        logger.info("gen size = %d", gen_size)
214        check_shell(f"shell dd if={SEP}dev/zero bs=1K count={gen_size} of="
215                    f"{SEP}{MOUNT_POINT}/gen_{gen_size}.img")
216        time.sleep(1)
217        pid = get_shell_result(f"shell pidof hdcd").split("\r")[0]
218        self.fd_count = get_shell_result(f"shell ls {SEP}proc/{pid}/fd | wc -l")
219
220    @pytest.mark.L2
221    @check_version("Ver: 3.1.0e")
222    @pytest.mark.parametrize("is_compress, is_directory, is_zero", TEST_FILE_CASE_TABLE,
223                             ids=[f"is_compress:{is_compress}, is_directory:{is_directory}, is_zero:{is_zero}"
224                                  for is_compress, is_directory, is_zero in TEST_FILE_CASE_TABLE])
225    def test_file_normal(self, is_compress, is_directory, is_zero):
226        assert self.re_send_file(is_compress=is_compress, is_directory=is_directory, is_zero=is_zero)
227
228    @pytest.mark.L2
229    @check_version("Ver: 3.1.0e")
230    @pytest.mark.parametrize("is_compress", [True, False], ids=["is_compress:True", "is_compress:False"])
231    def test_file_mix(self, is_compress):
232        assert self.re_send_file(is_compress=is_compress, is_mix=True)
233
234    @pytest.mark.L2
235    @check_version("Ver: 3.1.0e")
236    def test_file_compress_z_fd_proc(self):
237        assert not check_shell(f"shell ls -al {SEP}proc/`pidof hdcd`/fd", "it_nospace")
238
239    @pytest.mark.L2
240    @check_version("Ver: 3.1.0e")
241    def test_file_compress_z_fd_count(self):
242        time.sleep(1)
243        pid = get_shell_result(f"shell pidof hdcd").split("\r")[0]
244        assert get_shell_result(f"shell ls {SEP}proc/{pid}/fd | wc -l") <= self.fd_count
245
246
247class TestFileNoSpaceFdFullCrash:
248    fd_count = "0"
249    pid = "0"
250    plist = list()
251
252    @staticmethod
253    def teardown_class(self):
254        for p in self.plist:
255            p.join()
256
257    def setup_class(self):
258        depth = 10
259        if not os.path.exists(get_local_path("tree")):
260            create_binary_tree(depth, get_local_path("tree"), random=True)
261        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
262        check_shell(f"shell dd if={SEP}dev/zero bs=1K count=10 of={SEP}storage/smallfile")
263        check_shell(f"shell dd if={SEP}dev/zero bs=1M of={SEP}storage/largefile")
264        check_shell(f"shell df /storage")
265        check_shell(f"shell rm -rf /storage/smallfile")
266        check_shell(f"shell rm /data/log/faultlog/faultlogger/cppcrash*hdcd*")
267        time.sleep(1)
268        tree_path = get_local_path("tree")
269        check_shell(f"shell ls")
270        self.pid = get_shell_result(f"shell pidof hdcd").split("\r")[0]
271        self.fd_count = get_shell_result(f"shell ls {SEP}proc/{self.pid}/fd | wc -l")
272
273    def new_process_run(self, cmd):
274        with open(os.devnull, 'w') as devnull:
275            old_stdout = os.dup2(devnull.fileno(), 1)
276            old_stderr = os.dup2(devnull.fileno(), 2)
277            try:
278                check_shell(f"{cmd}")
279            finally:
280                os.dup2(old_stdout, 1)
281                os.dup2(old_stderr, 2)
282
283    @pytest.mark.L2
284    @check_version("Ver: 3.1.0e")
285    def test_file_fd_full_no_crash(self):
286        for i in range(1, SEND_FILE_PROCESS_COUNT):
287            cmd = "file send " + get_local_path("tree") + f" {SEP}storage/it_tree"
288            p = multiprocessing.Process(target=self.new_process_run, args=(cmd,))
289            p.start()
290            self.plist.append(p)
291
292        last_fd_count = 0
293        loop_count = 0
294        while True:
295            self.fd_count = get_shell_result(f"shell ls {SEP}proc/{self.pid}/fd | wc -l")
296            try:
297                if int(self.fd_count) >= 32768:
298                    break
299                if int(self.fd_count) < last_fd_count:
300                    run_command_with_timeout(f"{GP.hdc_head} kill", 3)
301                    return
302                last_fd_count = int(self.fd_count)
303                loop_count += 1
304                if loop_count % 5 == 0:
305                    logger.warning("fd count is:%s", self.fd_count)
306            except ValueError:
307                logger.warning("ValueError, last count:%d", last_fd_count)
308                break
309        run_command_with_timeout(f"{GP.hdc_head} kill", 3)
310        run_command_with_timeout(f"{GP.hdc_head} kill", 3)
311
312        run_command_with_timeout(f"{GP.hdc_head} wait", 3)
313        time.sleep(3)
314
315        assert not check_shell(f"shell ls {SEP}data/log/faultlog/faultlogger/", "-hdcd-")
316        new_pid = get_shell_result(f"shell pidof hdcd").split("\r")[0]
317        assert not self.pid == new_pid
318