• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2025 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import os
16import stat
17import threading
18import time
19import logging
20import multiprocessing
21import pytest
22
23from utils import GP, run_command_with_timeout, get_shell_result, \
24    check_shell, check_version, get_local_path, rmdir, load_gp, get_hdcd_pss, get_hdcd_fd_count
25
26SEP = "/"
27MOUNT_POINT = "storage"
28TEST_FILE_SIZE = 20  # 20KB
29SEND_FILE_PROCESS_COUNT = 25
30FD_THRESHOLD = 2
31TEST_FILE_CASE_TABLE = [
32    (False, False, True),
33    (False, False, False),
34    (False, True, True),
35    (False, True, False),
36    (True, False, True),
37    (True, False, False),
38    (True, True, True),
39    (True, True, False),
40]
41
42logger = logging.getLogger(__name__)
43
44
45def create_test_file(file_path, size, random=False):
46    flags = os.O_CREAT | os.O_WRONLY | os.O_EXCL
47    modes = stat.S_IWUSR | stat.S_IRUSR
48    with os.fdopen(os.open(file_path, flags, modes), 'wb') as f:
49        if random:
50            f.write(os.urandom(size * 1024))  # 100KB
51        else:
52            f.seek(size * 1024 - 1)  # 移动到文件末尾
53            f.write(b'\xff')  # 写入一个零字节
54
55
56def create_binary_tree(depth, path='.', size=TEST_FILE_SIZE, random=False):
57    if depth == 0:
58        create_test_file(os.path.join(path, f'{size}KB.1.bin'), size, random=random)
59        create_test_file(os.path.join(path, f'{size}KB.2.bin'), size, random=random)
60    else:
61        # 创建左右子目录
62        left_path = os.path.join(path, '1')
63        right_path = os.path.join(path, '2')
64        os.makedirs(left_path, exist_ok=True)
65        os.makedirs(right_path, exist_ok=True)
66
67        # 递归创建下一层目录
68        create_binary_tree(depth - 1, left_path, size)
69        create_binary_tree(depth - 1, right_path, size)
70
71
72class TestFileNoSpaceFdLeak:
73    """
74    直接填满磁盘空间进行文件传输,传输后查询fd泄露状态。
75    """
76    fd_count = 0
77    pss = 0
78
79    @staticmethod
80    def send_file_to_storage(is_compress=False, is_directory=False, is_zero=False, is_mix=False):
81        compress_command = "-z" if is_compress else ""
82        single_file_name = "medium" if not is_zero else "word_100M.txt"
83        single_dir_name = "tree_rand" if not is_zero else "tree_zero"
84        local_path = get_local_path(single_dir_name) if is_directory else get_local_path(single_file_name)
85        target_path = "it_nospace" if is_directory else "it_nospace.bin"
86        if is_mix:
87            local_path = get_local_path(".")
88            target_path = "it_nospace_mix"
89        re_send_time = 20
90        for i in range(1, re_send_time + 1):
91            logger.info("send %d times", i)
92            output_str, error_str = run_command_with_timeout(f"{GP.hdc_head} "
93                f"file send {compress_command} {local_path} {SEP}{MOUNT_POINT}/{target_path}_{i}", 25)
94            if "Command timed out" in error_str:
95                logger.warning("error_str: %s", error_str)
96                return False
97            if "space left on device" not in output_str:
98                logger.warning(f"output_str: %s", output_str)
99                return False
100        return True
101
102    def teardown_class(self):
103        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
104        rmdir(get_local_path("tree_rand"))
105        rmdir(get_local_path("tree_zero"))
106        pss_now = get_hdcd_pss()
107        if self.pss == 0 or pss_now == 0:
108            logger.error("get hdcd mem pss failed")
109        if pss_now > (self.pss + 50):
110            logger.warning("hdcd mem pss leak, original value %d, now value %d", self.pss, pss_now)
111
112    def setup_class(self):
113        depth = 10  # 目录深度
114        if not os.path.exists(get_local_path("tree_rand")):
115            create_binary_tree(depth, get_local_path("tree_rand"), random=True)
116        if not os.path.exists(get_local_path("tree_zero")):
117            create_binary_tree(depth, get_local_path("tree_zero"), random=False)
118        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
119        assert check_shell(f"shell dd if={SEP}dev/zero bs=500M count=24 of={SEP}storage/it_full.img",
120                           "space left on device")
121        time.sleep(1)
122        self.fd_count = get_hdcd_fd_count()
123        self.pss = get_hdcd_pss()
124        if self.pss == 0:
125            logger.error("get hdcd mem pss failed")
126
127    @pytest.mark.L2
128    @check_version("Ver: 3.1.0e")
129    @pytest.mark.parametrize("is_compress, is_directory, is_zero", TEST_FILE_CASE_TABLE,
130                             ids=[f"is_compress:{is_compress}, is_directory:{is_directory}, is_zero:{is_zero}"
131                                  for is_compress, is_directory, is_zero in TEST_FILE_CASE_TABLE])
132    def test_file_normal(self, is_compress, is_directory, is_zero):
133        assert self.send_file_to_storage(is_compress=is_compress, is_directory=is_directory, is_zero=is_zero)
134        pss_now = get_hdcd_pss()
135        if self.pss == 0 or pss_now == 0:
136            logger.error("get hdcd mem pss failed")
137        if pss_now > (self.pss + 50):
138            logger.warning("hdcd mem pss leak, original value %d, now value %d", self.pss, pss_now)
139
140    @pytest.mark.L2
141    @check_version("Ver: 3.1.0e")
142    @pytest.mark.parametrize("is_compress", [True, False], ids=["is_compress:True", "is_compress:False"])
143    def test_file_mix(self, is_compress):
144        assert self.send_file_to_storage(is_compress=is_compress, is_mix=True)
145        pss_now = get_hdcd_pss()
146        if self.pss == 0 or pss_now == 0:
147            logger.error("get hdcd mem pss failed")
148        if pss_now > (self.pss + 50):
149            logger.warning("hdcd mem pss leak, original value %d, now value %d", self.pss, pss_now)
150
151    @pytest.mark.L2
152    @check_version("Ver: 3.1.0e")
153    def test_file_fd_leak_proc(self):
154        assert not check_shell(f"shell ls -al {SEP}proc/`pidof hdcd`/fd", "it_nospace")
155
156    @pytest.mark.L2
157    @check_version("Ver: 3.1.0e")
158    def test_file_fd_count(self):
159        time.sleep(1)
160        assert get_hdcd_fd_count() <= (self.fd_count + FD_THRESHOLD)
161
162
163class TestFileReFullSpaceFdLeak:
164    """
165    磁盘空间接近满情况,进行文件传输,不断地删除并重传,传输后查询fd泄露状态。
166    """
167    stop_flag = threading.Event()
168    fd_count = 0
169    pss = 0
170
171    def re_create_file(self, num=600):
172        for i in range(1, num):
173            check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*;"
174                        f" dd if={SEP}dev/zero bs=1M count=10240 of="
175                        f"{SEP}storage/it_full.img")
176            logger.info("re create file count:%d", i)
177            if self.stop_flag.is_set():
178                break
179
180    def re_send_file(self, is_compress=False, is_directory=False, is_zero=False, is_mix=False):
181        re_create_file_thread = threading.Thread(target=self.re_create_file)
182        re_create_file_thread.start()
183        result = True
184        compress_command = "-z" if is_compress else ""
185        single_file_name = "medium" if not is_zero else "word_100M.txt"
186        single_dir_name = "tree_rand" if not is_zero else "tree_zero"
187        local_path = get_local_path(single_dir_name) if is_directory else get_local_path(single_file_name)
188        target_path = "it_nospace" if is_directory else "it_nospace.bin"
189        if is_mix:
190            local_path = get_local_path('.')
191            target_path = "it_nospace_mix"
192        re_send_time = 10
193        for i in range(1, re_send_time + 1):
194            output_str, error_str = run_command_with_timeout(f"{GP.hdc_head} "
195                f"file send {compress_command} {local_path} {SEP}{MOUNT_POINT}/{target_path}_{i}", 25)
196            logger.info("output:%s,error:%s", output_str, error_str)
197            if "Command timed out" in error_str:
198                logger.warning("Command timed out")
199                result = False
200                break
201            if "Transfer Stop" not in output_str:
202                logger.warning("Transfer Stop NOT FOUNT")
203                result = False
204                break
205        self.stop_flag.set()
206        re_create_file_thread.join()
207        return result
208
209    def setup_class(self):
210        depth = 10  # 目录深度
211        if not os.path.exists(get_local_path("tree_rand")):
212            create_binary_tree(depth, get_local_path("tree_rand"), random=True)
213        if not os.path.exists(get_local_path("tree_zero")):
214            create_binary_tree(depth, get_local_path("tree_zero"), random=False)
215        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
216        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/gen_*")
217        storage_size = get_shell_result(
218            f"shell \"df {SEP}{MOUNT_POINT} | grep {MOUNT_POINT}\"").split()[3]
219        logger.info("storage size =%s", storage_size)
220        assert int(storage_size) >= 10
221        gen_size = int(storage_size) - 10
222        logger.info("gen size = %d", gen_size)
223        check_shell(f"shell dd if={SEP}dev/zero bs=1K count={gen_size} of="
224                    f"{SEP}{MOUNT_POINT}/gen_{gen_size}.img")
225        time.sleep(1)
226        self.fd_count = get_hdcd_fd_count()
227        self.pss = get_hdcd_pss()
228        if self.pss == 0:
229            logger.error("get hdcd mem pss failed")
230
231    def teardown_class(self):
232        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
233        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/gen_*")
234        rmdir(get_local_path("tree_rand"))
235        rmdir(get_local_path("tree_zero"))
236        pss_now = get_hdcd_pss()
237        if self.pss == 0 or pss_now == 0:
238            logger.error("get hdcd mem pss failed")
239        if pss_now > (self.pss + 50):
240            logger.warning("hdcd mem pss leak, original value %d, now value %d", self.pss, pss_now)
241
242    @pytest.mark.L2
243    @check_version("Ver: 3.1.0e")
244    @pytest.mark.parametrize("is_compress, is_directory, is_zero", TEST_FILE_CASE_TABLE,
245                             ids=[f"is_compress:{is_compress}, is_directory:{is_directory}, is_zero:{is_zero}"
246                                  for is_compress, is_directory, is_zero in TEST_FILE_CASE_TABLE])
247    def test_file_normal(self, is_compress, is_directory, is_zero):
248        assert self.re_send_file(is_compress=is_compress, is_directory=is_directory, is_zero=is_zero)
249
250    @pytest.mark.L2
251    @check_version("Ver: 3.1.0e")
252    @pytest.mark.parametrize("is_compress", [True, False], ids=["is_compress:True", "is_compress:False"])
253    def test_file_mix(self, is_compress):
254        assert self.re_send_file(is_compress=is_compress, is_mix=True)
255
256    @pytest.mark.L2
257    @check_version("Ver: 3.1.0e")
258    def test_file_compress_z_fd_proc(self):
259        assert not check_shell(f"shell ls -al {SEP}proc/`pidof hdcd`/fd", "it_nospace")
260
261    @pytest.mark.L2
262    @check_version("Ver: 3.1.0e")
263    def test_file_compress_z_fd_count(self):
264        time.sleep(1)
265        assert get_hdcd_fd_count() <= (self.fd_count + FD_THRESHOLD)
266
267
268class TestFileNoSpaceFdFullCrash:
269    fd_count = "0"
270    pid = "0"
271    plist = list()
272
273    @staticmethod
274    def teardown_class(self):
275        for p in self.plist:
276            p.join()
277
278    @staticmethod
279    def new_process_run(cmd):
280        with open(os.devnull, 'w') as devnull:
281            old_stdout = os.dup2(devnull.fileno(), 1)
282            old_stderr = os.dup2(devnull.fileno(), 2)
283            try:
284                check_shell(f"{cmd}")
285            finally:
286                os.dup2(old_stdout, 1)
287                os.dup2(old_stderr, 2)
288
289    def setup_class(self):
290        depth = 10  # 目录深度
291        if not os.path.exists(get_local_path("tree")):
292            create_binary_tree(depth, get_local_path("tree"), random=True)
293        check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
294        check_shell(f"shell dd if={SEP}dev/zero bs=1K count=10 of={SEP}storage/smallfile")
295        check_shell(f"shell dd if={SEP}dev/zero bs=1M of={SEP}storage/largefile")
296        check_shell(f"shell df {SEP}storage")
297        check_shell(f"shell rm -rf {SEP}storage/smallfile")
298        time.sleep(1)
299        check_shell(f"shell ls")
300        self.pid = get_shell_result(f"shell pidof hdcd").split("\r")[0]
301        self.fd_count = get_shell_result(f"shell ls {SEP}proc/{self.pid}/fd | wc -l")
302
303    @pytest.mark.L2
304    @check_version("Ver: 3.1.0e")
305    def test_file_fd_full_no_crash(self):
306        for _ in range(1, SEND_FILE_PROCESS_COUNT):
307            cmd = f"file send {get_local_path('tree')} {SEP}storage/it_tree"
308            p = multiprocessing.Process(target=self.new_process_run, args=(cmd,))
309            p.start()
310            self.plist.append(p)
311
312        last_fd_count = 0
313        loop_count = 0
314        equal_count = 0
315        while True:
316            self.fd_count = get_shell_result(f"shell ls {SEP}proc/{self.pid}/fd | wc -l")
317            logger.warning("fd_count is %s", ("ff" + self.fd_count))
318            try:
319                if int(self.fd_count) >= 32768:
320                    break
321                if int(self.fd_count) < last_fd_count:
322                    logger.warning("fd count decrease.")
323                    assert not check_shell(f"shell ls data/log/faultlog/faultlogger/", "-hdcd-")
324                    run_command_with_timeout(f"{GP.hdc_head} kill", 3)
325                    return
326                if int(self.fd_count) == last_fd_count:
327                    equal_count += 1
328                if equal_count > 20:
329                    logger.warning("equal 20 times.")
330                    assert not check_shell(f"shell ls data/log/faultlog/faultlogger/", "-hdcd-")
331                    run_command_with_timeout(f"{GP.hdc_head} kill", 3)
332                    return
333                last_fd_count = int(self.fd_count)
334                loop_count += 1
335                if loop_count % 5 == 0:
336                    logger.warning("fd count is:%s", self.fd_count)
337            except ValueError:
338                logger.warning("ValueError")
339                break
340            except TypeError:
341                logger.warning("TypeError output:%s", self.fd_count)
342                break
343
344        run_command_with_timeout(f"{GP.hdc_head} kill", 3)
345        run_command_with_timeout(f"{GP.hdc_head} kill", 3)
346
347        run_command_with_timeout(f"{GP.hdc_head} wait", 3)
348        time.sleep(3)
349        assert not check_shell(f"shell ls data/log/faultlog/faultlogger/", "-hdcd-")
350