1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2025 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15import os 16import stat 17import threading 18import time 19import logging 20import pytest 21import multiprocessing 22 23from utils import GP, run_command_with_timeout, get_shell_result, \ 24 check_shell, check_version, get_local_path, rmdir, load_gp 25 26SEP = "/" 27MOUNT_POINT = "storage" 28TEST_FILE_SIZE = 20 # 20KB 29SEND_FILE_PROCESS_COUNT = 25 30TEST_FILE_CASE_TABLE = [ 31 (False, False, True), 32 (False, False, False), 33 (False, True, True), 34 (False, True, False), 35 (True, False, True), 36 (True, False, False), 37 (True, True, True), 38 (True, True, False), 39] 40 41logger = logging.getLogger(__name__) 42 43 44def create_test_file(file_path, size, random=False): 45 flags = os.O_CREAT | os.O_WRONLY | os.O_EXCL 46 modes = stat.S_IWUSR | stat.S_IRUSR 47 with os.fdopen(os.open(file_path, flags, modes), 'wb') as f: 48 if random: 49 f.write(os.urandom(size * 1024)) # 100KB 50 else: 51 f.seek(size * 1024 - 1) # 移动到文件末尾 52 f.write(b'\xff') # 写入一个零字节 53 54 55def create_binary_tree(depth, path='.', size=TEST_FILE_SIZE, random=False): 56 if depth == 0: 57 create_test_file(os.path.join(path, f'{size}KB.1.bin'), size, random=random) 58 create_test_file(os.path.join(path, f'{size}KB.2.bin'), size, random=random) 59 else: 60 # 创建左右子目录 61 left_path = os.path.join(path, '1') 62 right_path = os.path.join(path, '2') 63 os.makedirs(left_path, exist_ok=True) 64 os.makedirs(right_path, exist_ok=True) 65 66 # 递归创建下一层目录 67 create_binary_tree(depth - 1, left_path, size) 68 create_binary_tree(depth - 1, right_path, size) 69 70 71class TestFileNoSpaceFdLeak: 72 """ 73 直接填满磁盘空间进行文件传输,传输后查询fd泄露状态。 74 """ 75 fd_count = 0 76 77 @staticmethod 78 def send_file_to_storage(is_compress=False, is_directory=False, is_zero=False, is_mix=False): 79 compress_command = "-z" if is_compress else "" 80 single_file_name = "medium" if not is_zero else "word_100M.txt" 81 single_dir_name = "tree_rand" if not is_zero else "tree_zero" 82 local_path = get_local_path(single_dir_name) if is_directory else get_local_path(single_file_name) 83 target_path = "it_nospace" if is_directory else "it_nospace.bin" 84 if is_mix: 85 local_path = get_local_path(".") 86 target_path = "it_nospace_mix" 87 re_send_time = 10 88 for i in range(1, re_send_time + 1): 89 logger.info("send %d times", i) 90 output_str, error_str = run_command_with_timeout(f"{GP.hdc_head} " 91 f"file send {compress_command} {local_path} {SEP}{MOUNT_POINT}/{target_path}_{i}", 92 25) 93 if "Command timed out" in error_str: 94 logger.warning("error_str: %s", error_str) 95 return False 96 if "space left on device" not in output_str: 97 logger.warning(f"output_str: %s", output_str) 98 return False 99 return True 100 101 @staticmethod 102 def teardown_class(): 103 check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*") 104 rmdir(get_local_path("tree_rand")) 105 rmdir(get_local_path("tree_zero")) 106 107 def setup_class(self): 108 depth = 10 # 目录深度 109 if not os.path.exists(get_local_path("tree_rand")): 110 create_binary_tree(depth, get_local_path("tree_rand"), random=True) 111 if not os.path.exists(get_local_path("tree_zero")): 112 create_binary_tree(depth, get_local_path("tree_zero"), random=False) 113 check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*") 114 assert check_shell(f"shell dd if={SEP}dev/zero bs=500M count=24 of={SEP}storage/it_full.img", 115 "space left on device") 116 time.sleep(1) 117 pid = get_shell_result(f"shell pidof hdcd").split("\r")[0] 118 self.fd_count = get_shell_result(f"shell ls {SEP}proc/{pid}/fd | wc -l") 119 120 @pytest.mark.L2 121 @check_version("Ver: 3.1.0e") 122 @pytest.mark.parametrize("is_compress, is_directory, is_zero", TEST_FILE_CASE_TABLE, 123 ids=[f"is_compress:{is_compress}, is_directory:{is_directory}, is_zero:{is_zero}" 124 for is_compress, is_directory, is_zero in TEST_FILE_CASE_TABLE]) 125 def test_file_normal(self, is_compress, is_directory, is_zero): 126 assert self.send_file_to_storage(is_compress=is_compress, is_directory=is_directory, is_zero=is_zero) 127 128 @pytest.mark.L2 129 @check_version("Ver: 3.1.0e") 130 @pytest.mark.parametrize("is_compress", [True, False], ids=["is_compress:True", "is_compress:False"]) 131 def test_file_mix(self, is_compress): 132 assert self.send_file_to_storage(is_compress=is_compress, is_mix=True) 133 134 @pytest.mark.L2 135 @check_version("Ver: 3.1.0e") 136 def test_file_fd_leak_proc(self): 137 assert not check_shell(f"shell ls -al {SEP}proc/`pidof hdcd`/fd", "it_nospace") 138 139 @pytest.mark.L2 140 @check_version("Ver: 3.1.0e") 141 def test_file_fd_count(self): 142 time.sleep(1) 143 pid = get_shell_result(f"shell pidof hdcd").split("\r")[0] 144 assert get_shell_result(f"shell ls {SEP}proc/{pid}/fd | wc -l") <= self.fd_count 145 146 147class TestFileReFullSpaceFdLeak: 148 """ 149 磁盘空间接近满情况,进行文件传输,不断地删除并重传,传输后查询fd泄露状态。 150 """ 151 stop_flag = threading.Event() 152 fd_count = 0 153 154 @staticmethod 155 def teardown_class(self): 156 check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*") 157 check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/gen_*") 158 rmdir(get_local_path("tree_rand")) 159 rmdir(get_local_path("tree_zero")) 160 161 def re_create_file(self, num=600): 162 for i in range(1, num): 163 check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*;" 164 f" dd if={SEP}dev/zero bs=1M count=10240 of=" 165 f"{SEP}storage/it_full.img") 166 logger.info("re create file count:%d", i) 167 if self.stop_flag.is_set(): 168 break 169 170 def re_send_file(self, is_compress=False, is_directory=False, is_zero=False, is_mix=False): 171 re_create_file_thread = threading.Thread(target=self.re_create_file) 172 re_create_file_thread.start() 173 result = True 174 compress_command = "-z" if is_compress else "" 175 single_file_name = "medium" if not is_zero else "word_100M.txt" 176 single_dir_name = "tree_rand" if not is_zero else "tree_zero" 177 local_path = get_local_path(single_dir_name) if is_directory else get_local_path(single_file_name) 178 target_path = "it_nospace" if is_directory else "it_nospace.bin" 179 if is_mix: 180 local_path = get_local_path('.') 181 target_path = "it_nospace_mix" 182 re_send_time = 10 183 for i in range(1, re_send_time + 1): 184 output_str, error_str = run_command_with_timeout(f"{GP.hdc_head} " 185 f"file send {compress_command} {local_path} {SEP}{MOUNT_POINT}/{target_path}_{i}", 186 25) 187 logger.info("output:%s,error:%s", output_str, error_str) 188 if "Command timed out" in error_str: 189 logger.warning("Command timed out") 190 result = False 191 break 192 if "Transfer Stop" not in output_str: 193 logger.warning("Transfer Stop NOT FOUNT") 194 result = False 195 break 196 self.stop_flag.set() 197 re_create_file_thread.join() 198 return result 199 200 def setup_class(self): 201 depth = 10 # 目录深度 202 if not os.path.exists(get_local_path("tree_rand")): 203 create_binary_tree(depth, get_local_path("tree_rand"), random=True) 204 if not os.path.exists(get_local_path("tree_zero")): 205 create_binary_tree(depth, get_local_path("tree_zero"), random=False) 206 check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*") 207 check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/gen_*") 208 storage_size = get_shell_result( 209 f"shell \"df {SEP}{MOUNT_POINT} | grep {MOUNT_POINT}\"").split()[3] 210 logger.info("storage size =%s", storage_size) 211 assert int(storage_size) >= 10 212 gen_size = int(storage_size) - 10 213 logger.info("gen size = %d", gen_size) 214 check_shell(f"shell dd if={SEP}dev/zero bs=1K count={gen_size} of=" 215 f"{SEP}{MOUNT_POINT}/gen_{gen_size}.img") 216 time.sleep(1) 217 pid = get_shell_result(f"shell pidof hdcd").split("\r")[0] 218 self.fd_count = get_shell_result(f"shell ls {SEP}proc/{pid}/fd | wc -l") 219 220 @pytest.mark.L2 221 @check_version("Ver: 3.1.0e") 222 @pytest.mark.parametrize("is_compress, is_directory, is_zero", TEST_FILE_CASE_TABLE, 223 ids=[f"is_compress:{is_compress}, is_directory:{is_directory}, is_zero:{is_zero}" 224 for is_compress, is_directory, is_zero in TEST_FILE_CASE_TABLE]) 225 def test_file_normal(self, is_compress, is_directory, is_zero): 226 assert self.re_send_file(is_compress=is_compress, is_directory=is_directory, is_zero=is_zero) 227 228 @pytest.mark.L2 229 @check_version("Ver: 3.1.0e") 230 @pytest.mark.parametrize("is_compress", [True, False], ids=["is_compress:True", "is_compress:False"]) 231 def test_file_mix(self, is_compress): 232 assert self.re_send_file(is_compress=is_compress, is_mix=True) 233 234 @pytest.mark.L2 235 @check_version("Ver: 3.1.0e") 236 def test_file_compress_z_fd_proc(self): 237 assert not check_shell(f"shell ls -al {SEP}proc/`pidof hdcd`/fd", "it_nospace") 238 239 @pytest.mark.L2 240 @check_version("Ver: 3.1.0e") 241 def test_file_compress_z_fd_count(self): 242 time.sleep(1) 243 pid = get_shell_result(f"shell pidof hdcd").split("\r")[0] 244 assert get_shell_result(f"shell ls {SEP}proc/{pid}/fd | wc -l") <= self.fd_count 245 246 247class TestFileNoSpaceFdFullCrash: 248 fd_count = "0" 249 pid = "0" 250 plist = list() 251 252 @staticmethod 253 def teardown_class(self): 254 for p in self.plist: 255 p.join() 256 257 def setup_class(self): 258 depth = 10 259 if not os.path.exists(get_local_path("tree")): 260 create_binary_tree(depth, get_local_path("tree"), random=True) 261 check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*") 262 check_shell(f"shell dd if={SEP}dev/zero bs=1K count=10 of={SEP}storage/smallfile") 263 check_shell(f"shell dd if={SEP}dev/zero bs=1M of={SEP}storage/largefile") 264 check_shell(f"shell df /storage") 265 check_shell(f"shell rm -rf /storage/smallfile") 266 check_shell(f"shell rm /data/log/faultlog/faultlogger/cppcrash*hdcd*") 267 time.sleep(1) 268 tree_path = get_local_path("tree") 269 check_shell(f"shell ls") 270 self.pid = get_shell_result(f"shell pidof hdcd").split("\r")[0] 271 self.fd_count = get_shell_result(f"shell ls {SEP}proc/{self.pid}/fd | wc -l") 272 273 def new_process_run(self, cmd): 274 with open(os.devnull, 'w') as devnull: 275 old_stdout = os.dup2(devnull.fileno(), 1) 276 old_stderr = os.dup2(devnull.fileno(), 2) 277 try: 278 check_shell(f"{cmd}") 279 finally: 280 os.dup2(old_stdout, 1) 281 os.dup2(old_stderr, 2) 282 283 @pytest.mark.L2 284 @check_version("Ver: 3.1.0e") 285 def test_file_fd_full_no_crash(self): 286 for i in range(1, SEND_FILE_PROCESS_COUNT): 287 cmd = "file send " + get_local_path("tree") + f" {SEP}storage/it_tree" 288 p = multiprocessing.Process(target=self.new_process_run, args=(cmd,)) 289 p.start() 290 self.plist.append(p) 291 292 last_fd_count = 0 293 loop_count = 0 294 while True: 295 self.fd_count = get_shell_result(f"shell ls {SEP}proc/{self.pid}/fd | wc -l") 296 try: 297 if int(self.fd_count) >= 32768: 298 break 299 if int(self.fd_count) < last_fd_count: 300 run_command_with_timeout(f"{GP.hdc_head} kill", 3) 301 return 302 last_fd_count = int(self.fd_count) 303 loop_count += 1 304 if loop_count % 5 == 0: 305 logger.warning("fd count is:%s", self.fd_count) 306 except ValueError: 307 logger.warning("ValueError, last count:%d", last_fd_count) 308 break 309 run_command_with_timeout(f"{GP.hdc_head} kill", 3) 310 run_command_with_timeout(f"{GP.hdc_head} kill", 3) 311 312 run_command_with_timeout(f"{GP.hdc_head} wait", 3) 313 time.sleep(3) 314 315 assert not check_shell(f"shell ls {SEP}data/log/faultlog/faultlogger/", "-hdcd-") 316 new_pid = get_shell_result(f"shell pidof hdcd").split("\r")[0] 317 assert not self.pid == new_pid 318