1#!/usr/bin/env python3 2import subprocess 3import time 4import glob 5import os 6import tempfile 7import unittest 8import sys 9 10SIZES = [3, 11] # Always 2 sizes 11MIB = 1048576 12LZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../lz4") 13if not os.path.exists(LZ4): 14 LZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../programs/lz4") 15TEMP = tempfile.gettempdir() 16 17 18class NVerboseFileInfo(object): 19 def __init__(self, line_in): 20 self.line = line_in 21 splitlines = line_in.split() 22 if len(splitlines) != 7: 23 errout(f"Unexpected line: {line_in}") 24 self.frames, self.type, self.block, self.compressed, self.uncompressed, self.ratio, self.filename = splitlines 25 self.exp_unc_size = 0 26 # Get real file sizes 27 if "concat-all" in self.filename or "2f--content-size" in self.filename: 28 for i in SIZES: 29 self.exp_unc_size += os.path.getsize(f"{TEMP}/test_list_{i}M") 30 else: 31 uncompressed_filename = self.filename.split("-")[0] 32 self.exp_unc_size += os.path.getsize(f"{TEMP}/{uncompressed_filename}") 33 self.exp_comp_size = os.path.getsize(f"{TEMP}/{self.filename}") 34 35 36class TestNonVerbose(unittest.TestCase): 37 @classmethod 38 def setUpClass(self): 39 self.nvinfo_list = [] 40 test_list_files = glob.glob(f"{TEMP}/test_list_*.lz4") 41 # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file 42 for i, filename in enumerate(test_list_files): 43 for i, line in enumerate(execute(f"{LZ4} --list -m {filename}", print_output=True)): 44 if i > 0: 45 self.nvinfo_list.append(NVerboseFileInfo(line)) 46 47 def test_frames(self): 48 all_concat_frames = 0 49 all_concat_index = None 50 for i, nvinfo in enumerate(self.nvinfo_list): 51 if "concat-all" in nvinfo.filename: 52 all_concat_index = i 53 elif "2f--content-size" in nvinfo.filename: 54 self.assertEqual("2", nvinfo.frames, nvinfo.line) 55 all_concat_frames += 2 56 else: 57 self.assertEqual("1", nvinfo.frames, nvinfo.line) 58 all_concat_frames += 1 59 self.assertNotEqual(None, all_concat_index, "Couldn't find concat-all file index.") 60 self.assertEqual(self.nvinfo_list[all_concat_index].frames, str(all_concat_frames), self.nvinfo_list[all_concat_index].line) 61 62 def test_frame_types(self): 63 for nvinfo in self.nvinfo_list: 64 if "-lz4f-" in nvinfo.filename: 65 self.assertEqual(nvinfo.type, "LZ4Frame", nvinfo.line) 66 elif "-legc-" in nvinfo.filename: 67 self.assertEqual(nvinfo.type, "LegacyFrame", nvinfo.line) 68 elif "-skip-" in nvinfo.filename: 69 self.assertEqual(nvinfo.type, "SkippableFrame", nvinfo.line) 70 71 def test_block(self): 72 for nvinfo in self.nvinfo_list: 73 # if "-leg" in nvinfo.filename or "-skip" in nvinfo.filename: 74 # self.assertEqual(nvinfo.block, "-", nvinfo.line) 75 if "--BD" in nvinfo.filename: 76 self.assertRegex(nvinfo.block, "^B[0-9]+D$", nvinfo.line) 77 elif "--BI" in nvinfo.filename: 78 self.assertRegex(nvinfo.block, "^B[0-9]+I$", nvinfo.line) 79 80 def test_compressed_size(self): 81 for nvinfo in self.nvinfo_list: 82 self.assertEqual(nvinfo.compressed, to_human(nvinfo.exp_comp_size), nvinfo.line) 83 84 def test_ratio(self): 85 for nvinfo in self.nvinfo_list: 86 if "--content-size" in nvinfo.filename: 87 self.assertEqual(nvinfo.ratio, f"{float(nvinfo.exp_comp_size) / float(nvinfo.exp_unc_size) * 100:.2f}%", nvinfo.line) 88 89 def test_uncompressed_size(self): 90 for nvinfo in self.nvinfo_list: 91 if "--content-size" in nvinfo.filename: 92 self.assertEqual(nvinfo.uncompressed, to_human(nvinfo.exp_unc_size), nvinfo.line) 93 94 95class VerboseFileInfo(object): 96 def __init__(self, lines): 97 # Parse lines 98 self.frame_list = [] 99 self.file_frame_map = [] 100 for i, line in enumerate(lines): 101 if i == 0: 102 self.filename = line 103 continue 104 elif i == 1: 105 # Skip header 106 continue 107 frame_info = dict(zip(["frame", "type", "block", "checksum", "compressed", "uncompressed", "ratio"], line.split())) 108 frame_info["line"] = line 109 self.frame_list.append(frame_info) 110 111 112class TestVerbose(unittest.TestCase): 113 @classmethod 114 def setUpClass(self): 115 # Even do we're listing 2 files to test multiline working as expected. 116 # we're only really interested in testing the output of the concat-all file. 117 self.vinfo_list = [] 118 start = end = 0 119 test_list_SM_lz4f = glob.glob(f"{TEMP}/test_list_*M-lz4f-2f--content-size.lz4") 120 for i, filename in enumerate(test_list_SM_lz4f): 121 output = execute(f"{LZ4} --list -m -v {TEMP}/test_list_concat-all.lz4 {filename}", print_output=True) 122 for i, line in enumerate(output): 123 if line.startswith("test_list"): 124 if start != 0 and end != 0: 125 self.vinfo_list.append(VerboseFileInfo(output[start:end])) 126 start = i 127 if not line: 128 end = i 129 self.vinfo_list.append(VerboseFileInfo(output[start:end])) 130 # Populate file_frame_map as a reference of the expected info 131 concat_file_list = glob.glob(f"{TEMP}/test_list_[!concat]*.lz4") 132 # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file 133 for i, filename in enumerate(concat_file_list): 134 if "2f--content-size" in filename: 135 concat_file_list.insert(i, filename) 136 break 137 self.cvinfo = self.vinfo_list[0] 138 self.cvinfo.file_frame_map = concat_file_list 139 self.cvinfo.compressed_size = os.path.getsize(f"{TEMP}/test_list_concat-all.lz4") 140 141 def test_filename(self): 142 for i, vinfo in enumerate(self.vinfo_list): 143 self.assertRegex(vinfo.filename, f"^test_list_.*({i + 1}/{len(self.vinfo_list)})".format(i + 1, len(self.vinfo_list))) 144 145 def test_frame_number(self): 146 for vinfo in self.vinfo_list: 147 for i, frame_info in enumerate(vinfo.frame_list): 148 self.assertEqual(frame_info["frame"], str(i + 1), frame_info["line"]) 149 150 def test_frame_type(self): 151 for i, frame_info in enumerate(self.cvinfo.frame_list): 152 if "-lz4f-" in self.cvinfo.file_frame_map[i]: 153 self.assertEqual(self.cvinfo.frame_list[i]["type"], "LZ4Frame", self.cvinfo.frame_list[i]["line"]) 154 elif "-legc-" in self.cvinfo.file_frame_map[i]: 155 self.assertEqual(self.cvinfo.frame_list[i]["type"], "LegacyFrame", self.cvinfo.frame_list[i]["line"]) 156 elif "-skip-" in self.cvinfo.file_frame_map[i]: 157 self.assertEqual(self.cvinfo.frame_list[i]["type"], "SkippableFrame", self.cvinfo.frame_list[i]["line"]) 158 159 def test_block(self): 160 for i, frame_info in enumerate(self.cvinfo.frame_list): 161 if "--BD" in self.cvinfo.file_frame_map[i]: 162 self.assertRegex(self.cvinfo.frame_list[i]["block"], "^B[0-9]+D$", self.cvinfo.frame_list[i]["line"]) 163 elif "--BI" in self.cvinfo.file_frame_map[i]: 164 self.assertEqual(self.cvinfo.frame_list[i]["block"], "^B[0-9]+I$", self.cvinfo.frame_list[i]["line"]) 165 166 def test_checksum(self): 167 for i, frame_info in enumerate(self.cvinfo.frame_list): 168 if "-lz4f-" in self.cvinfo.file_frame_map[i] and "--no-frame-crc" not in self.cvinfo.file_frame_map[i]: 169 self.assertEqual(self.cvinfo.frame_list[i]["checksum"], "XXH32", self.cvinfo.frame_list[i]["line"]) 170 171 def test_compressed(self): 172 total = 0 173 for i, frame_info in enumerate(self.cvinfo.frame_list): 174 if "-2f-" not in self.cvinfo.file_frame_map[i]: 175 expected_size = os.path.getsize(self.cvinfo.file_frame_map[i]) 176 self.assertEqual(self.cvinfo.frame_list[i]["compressed"], str(expected_size), self.cvinfo.frame_list[i]["line"]) 177 total += int(self.cvinfo.frame_list[i]["compressed"]) 178 self.assertEqual(total, self.cvinfo.compressed_size, f"Expected total sum ({total}) to match {self.cvinfo.filename} filesize") 179 180 def test_uncompressed(self): 181 for i, frame_info in enumerate(self.cvinfo.frame_list): 182 ffm = self.cvinfo.file_frame_map[i] 183 if "-2f-" not in ffm and "--content-size" in ffm: 184 expected_size_unc = int(ffm[ffm.rindex("_") + 1:ffm.index("M")]) * 1048576 185 self.assertEqual(self.cvinfo.frame_list[i]["uncompressed"], str(expected_size_unc), self.cvinfo.frame_list[i]["line"]) 186 187 def test_ratio(self): 188 for i, frame_info in enumerate(self.cvinfo.frame_list): 189 if "--content-size" in self.cvinfo.file_frame_map[i]: 190 self.assertEqual(self.cvinfo.frame_list[i]['ratio'], 191 f"{float(self.cvinfo.frame_list[i]['compressed']) / float(self.cvinfo.frame_list[i]['uncompressed']) * 100:.2f}%", 192 self.cvinfo.frame_list[i]["line"]) 193 194 195def to_human(size): 196 for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']: 197 if size < 1024.0: 198 break 199 size /= 1024.0 200 return f"{size:.2f}{unit}" 201 202 203def log(text): 204 print(time.strftime("%Y/%m/%d %H:%M:%S") + ' - ' + text) 205 206 207def errout(text, err=1): 208 log(text) 209 exit(err) 210 211 212def execute(command, print_command=True, print_output=False, print_error=True): 213 if os.environ.get('QEMU_SYS'): 214 command = f"{os.environ['QEMU_SYS']} {command}" 215 if print_command: 216 log("> " + command) 217 popen = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) 218 stdout_lines, stderr_lines = popen.communicate() 219 stderr_lines = stderr_lines.decode("utf-8") 220 stdout_lines = stdout_lines.decode("utf-8") 221 if print_output: 222 if stdout_lines: 223 print(stdout_lines) 224 if stderr_lines: 225 print(stderr_lines) 226 if popen.returncode is not None and popen.returncode != 0: 227 if stderr_lines and not print_output and print_error: 228 print(stderr_lines) 229 errout(f"Failed to run: {command}, {stdout_lines + stderr_lines}\n") 230 return (stdout_lines + stderr_lines).splitlines() 231 232 233def cleanup(silent=False): 234 for f in glob.glob(f"{TEMP}/test_list*"): 235 if not silent: 236 log(f"Deleting {f}") 237 os.unlink(f) 238 239 240def datagen(file_name, size): 241 non_sparse_size = size // 2 242 sparse_size = size - non_sparse_size 243 with open(file_name, "wb") as f: 244 f.seek(sparse_size) 245 f.write(os.urandom(non_sparse_size)) 246 247 248def generate_files(): 249 # file format ~ test_list<frametype>-<no_frames>f<create-args>.lz4 ~ 250 # Generate LZ4Frames 251 for i in SIZES: 252 filename = f"{TEMP}/test_list_{i}M" 253 log(f"Generating {filename}") 254 datagen(filename, i * MIB) 255 for j in ["--content-size", "-BI", "-BD", "-BX", "--no-frame-crc"]: 256 lz4file = f"{filename}-lz4f-1f{j}.lz4" 257 execute(f"{LZ4} {j} {filename} {lz4file}") 258 # Generate skippable frames 259 lz4file = f"{filename}-skip-1f.lz4" 260 skipsize = i * 1024 261 skipbytes = bytes([80, 42, 77, 24]) + skipsize.to_bytes(4, byteorder='little', signed=False) 262 with open(lz4file, 'wb') as f: 263 f.write(skipbytes) 264 f.write(os.urandom(skipsize)) 265 # Generate legacy frames 266 lz4file = f"{filename}-legc-1f.lz4" 267 execute(f"{LZ4} -l {filename} {lz4file}") 268 269 # Concatenate --content-size files 270 file_list = glob.glob(f"{TEMP}/test_list_*-lz4f-1f--content-size.lz4") 271 with open(f"{TEMP}/test_list_{sum(SIZES)}M-lz4f-2f--content-size.lz4", 'ab') as outfile: 272 for fname in file_list: 273 with open(fname, 'rb') as infile: 274 outfile.write(infile.read()) 275 276 # Concatenate all files 277 file_list = glob.glob(f"{TEMP}/test_list_*.lz4") 278 with open(f"{TEMP}/test_list_concat-all.lz4", 'ab') as outfile: 279 for fname in file_list: 280 with open(fname, 'rb') as infile: 281 outfile.write(infile.read()) 282 283 284if __name__ == '__main__': 285 cleanup() 286 generate_files() 287 ret = unittest.main(verbosity=2, exit=False) 288 cleanup(silent=True) 289 sys.exit(not ret.result.wasSuccessful()) 290