1#! /usr/bin/env python3 2import subprocess 3import time 4import glob 5import os 6import tempfile 7import unittest 8 9SIZES = [3, 11] # Always 2 sizes 10MIB = 1048576 11LZ4 = os.path.dirname(os.path.realpath(__file__)) + "/../lz4" 12if not os.path.exists(LZ4): 13 LZ4 = os.path.dirname(os.path.realpath(__file__)) + "/../programs/lz4" 14TEMP = tempfile.gettempdir() 15 16 17class NVerboseFileInfo(object): 18 def __init__(self, line_in): 19 self.line = line_in 20 splitlines = line_in.split() 21 if len(splitlines) != 7: 22 errout("Unexpected line: {}".format(line_in)) 23 self.frames, self.type, self.block, self.compressed, self.uncompressed, self.ratio, self.filename = splitlines 24 self.exp_unc_size = 0 25 # Get real file sizes 26 if "concat-all" in self.filename or "2f--content-size" in self.filename: 27 for i in SIZES: 28 self.exp_unc_size += os.path.getsize("{}/test_list_{}M".format(TEMP, i)) 29 else: 30 uncompressed_filename = self.filename.split("-")[0] 31 self.exp_unc_size += os.path.getsize("{}/{}".format(TEMP, uncompressed_filename)) 32 self.exp_comp_size = os.path.getsize("{}/{}".format(TEMP, self.filename)) 33 34 35class TestNonVerbose(unittest.TestCase): 36 @classmethod 37 def setUpClass(self): 38 self.nvinfo_list = [] 39 for i, line in enumerate(execute("{} --list -m {}/test_list_*.lz4".format(LZ4, TEMP), print_output=True)): 40 if i > 0: 41 self.nvinfo_list.append(NVerboseFileInfo(line)) 42 43 def test_frames(self): 44 all_concat_frames = 0 45 all_concat_index = None 46 for i, nvinfo in enumerate(self.nvinfo_list): 47 if "concat-all" in nvinfo.filename: 48 all_concat_index = i 49 elif "2f--content-size" in nvinfo.filename: 50 self.assertEqual("2", nvinfo.frames, nvinfo.line) 51 all_concat_frames += 2 52 else: 53 self.assertEqual("1", nvinfo.frames, nvinfo.line) 54 all_concat_frames += 1 55 self.assertNotEqual(None, all_concat_index, "Couldn't find concat-all file index.") 56 self.assertEqual(self.nvinfo_list[all_concat_index].frames, str(all_concat_frames), self.nvinfo_list[all_concat_index].line) 57 58 def test_frame_types(self): 59 for nvinfo in self.nvinfo_list: 60 if "-lz4f-" in nvinfo.filename: 61 self.assertEqual(nvinfo.type, "LZ4Frame", nvinfo.line) 62 elif "-legc-" in nvinfo.filename: 63 self.assertEqual(nvinfo.type, "LegacyFrame", nvinfo.line) 64 elif "-skip-" in nvinfo.filename: 65 self.assertEqual(nvinfo.type, "SkippableFrame", nvinfo.line) 66 67 def test_block(self): 68 for nvinfo in self.nvinfo_list: 69 # if "-leg" in nvinfo.filename or "-skip" in nvinfo.filename: 70 # self.assertEqual(nvinfo.block, "-", nvinfo.line) 71 if "--BD" in nvinfo.filename: 72 self.assertRegex(nvinfo.block, "^B[0-9]+D$", nvinfo.line) 73 elif "--BI" in nvinfo.filename: 74 self.assertRegex(nvinfo.block, "^B[0-9]+I$", nvinfo.line) 75 76 def test_compressed_size(self): 77 for nvinfo in self.nvinfo_list: 78 self.assertEqual(nvinfo.compressed, to_human(nvinfo.exp_comp_size), nvinfo.line) 79 80 def test_ratio(self): 81 for nvinfo in self.nvinfo_list: 82 if "--content-size" in nvinfo.filename: 83 self.assertEqual(nvinfo.ratio, "{:.2f}%".format(float(nvinfo.exp_comp_size) / float(nvinfo.exp_unc_size) * 100), nvinfo.line) 84 85 def test_uncompressed_size(self): 86 for nvinfo in self.nvinfo_list: 87 if "--content-size" in nvinfo.filename: 88 self.assertEqual(nvinfo.uncompressed, to_human(nvinfo.exp_unc_size), nvinfo.line) 89 90 91class VerboseFileInfo(object): 92 def __init__(self, lines): 93 # Parse lines 94 self.frame_list = [] 95 self.file_frame_map = [] 96 for i, line in enumerate(lines): 97 if i == 0: 98 self.filename = line 99 continue 100 elif i == 1: 101 # Skip header 102 continue 103 frame_info = dict(zip(["frame", "type", "block", "checksum", "compressed", "uncompressed", "ratio"], line.split())) 104 frame_info["line"] = line 105 self.frame_list.append(frame_info) 106 107 108class TestVerbose(unittest.TestCase): 109 @classmethod 110 def setUpClass(self): 111 # Even do we're listing 2 files to test multiline working as expected. 112 # we're only really interested in testing the output of the concat-all file. 113 self.vinfo_list = [] 114 start = end = 0 115 output = execute("{} --list -m -v {}/test_list_concat-all.lz4 {}/test_list_*M-lz4f-2f--content-size.lz4".format(LZ4, TEMP, TEMP), print_output=True) 116 for i, line in enumerate(output): 117 if line.startswith("test_list"): 118 if start != 0 and end != 0: 119 self.vinfo_list.append(VerboseFileInfo(output[start:end])) 120 start = i 121 if not line: 122 end = i 123 self.vinfo_list.append(VerboseFileInfo(output[start:end])) 124 # Populate file_frame_map as a reference of the expected info 125 concat_file_list = glob.glob("/tmp/test_list_[!concat]*.lz4") 126 # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file 127 for i, filename in enumerate(concat_file_list): 128 if "2f--content-size" in filename: 129 concat_file_list.insert(i, filename) 130 break 131 self.cvinfo = self.vinfo_list[0] 132 self.cvinfo.file_frame_map = concat_file_list 133 self.cvinfo.compressed_size = os.path.getsize("{}/test_list_concat-all.lz4".format(TEMP)) 134 135 def test_filename(self): 136 for i, vinfo in enumerate(self.vinfo_list): 137 self.assertRegex(vinfo.filename, "^test_list_.*({}/{})".format(i + 1, len(self.vinfo_list))) 138 139 def test_frame_number(self): 140 for vinfo in self.vinfo_list: 141 for i, frame_info in enumerate(vinfo.frame_list): 142 self.assertEqual(frame_info["frame"], str(i + 1), frame_info["line"]) 143 144 def test_frame_type(self): 145 for i, frame_info in enumerate(self.cvinfo.frame_list): 146 if "-lz4f-" in self.cvinfo.file_frame_map[i]: 147 self.assertEqual(self.cvinfo.frame_list[i]["type"], "LZ4Frame", self.cvinfo.frame_list[i]["line"]) 148 elif "-legc-" in self.cvinfo.file_frame_map[i]: 149 self.assertEqual(self.cvinfo.frame_list[i]["type"], "LegacyFrame", self.cvinfo.frame_list[i]["line"]) 150 elif "-skip-" in self.cvinfo.file_frame_map[i]: 151 self.assertEqual(self.cvinfo.frame_list[i]["type"], "SkippableFrame", self.cvinfo.frame_list[i]["line"]) 152 153 def test_block(self): 154 for i, frame_info in enumerate(self.cvinfo.frame_list): 155 if "--BD" in self.cvinfo.file_frame_map[i]: 156 self.assertRegex(self.cvinfo.frame_list[i]["block"], "^B[0-9]+D$", self.cvinfo.frame_list[i]["line"]) 157 elif "--BI" in self.cvinfo.file_frame_map[i]: 158 self.assertEqual(self.cvinfo.frame_list[i]["block"], "^B[0-9]+I$", self.cvinfo.frame_list[i]["line"]) 159 160 def test_checksum(self): 161 for i, frame_info in enumerate(self.cvinfo.frame_list): 162 if "-lz4f-" in self.cvinfo.file_frame_map[i] and "--no-frame-crc" not in self.cvinfo.file_frame_map[i]: 163 self.assertEqual(self.cvinfo.frame_list[i]["checksum"], "XXH32", self.cvinfo.frame_list[i]["line"]) 164 165 def test_compressed(self): 166 total = 0 167 for i, frame_info in enumerate(self.cvinfo.frame_list): 168 if "-2f-" not in self.cvinfo.file_frame_map[i]: 169 expected_size = os.path.getsize(self.cvinfo.file_frame_map[i]) 170 self.assertEqual(self.cvinfo.frame_list[i]["compressed"], str(expected_size), self.cvinfo.frame_list[i]["line"]) 171 total += int(self.cvinfo.frame_list[i]["compressed"]) 172 self.assertEqual(total, self.cvinfo.compressed_size, "Expected total sum ({}) to match {} filesize".format(total, self.cvinfo.filename)) 173 174 def test_uncompressed(self): 175 for i, frame_info in enumerate(self.cvinfo.frame_list): 176 ffm = self.cvinfo.file_frame_map[i] 177 if "-2f-" not in ffm and "--content-size" in ffm: 178 expected_size_unc = int(ffm[ffm.rindex("_") + 1:ffm.index("M")]) * 1048576 179 self.assertEqual(self.cvinfo.frame_list[i]["uncompressed"], str(expected_size_unc), self.cvinfo.frame_list[i]["line"]) 180 181 def test_ratio(self): 182 for i, frame_info in enumerate(self.cvinfo.frame_list): 183 if "--content-size" in self.cvinfo.file_frame_map[i]: 184 self.assertEqual(self.cvinfo.frame_list[i]['ratio'], 185 "{:.2f}%".format(float(self.cvinfo.frame_list[i]['compressed']) / float(self.cvinfo.frame_list[i]['uncompressed']) * 100), 186 self.cvinfo.frame_list[i]["line"]) 187 188 189def to_human(size): 190 for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']: 191 if size < 1024.0: 192 break 193 size /= 1024.0 194 return "{:.2f}{}".format(size, unit) 195 196 197def log(text): 198 print(time.strftime("%Y/%m/%d %H:%M:%S") + ' - ' + text) 199 200 201def errout(text, err=1): 202 log(text) 203 exit(err) 204 205 206def execute(command, print_command=True, print_output=False, print_error=True, param_shell=True): 207 if os.environ.get('QEMU_SYS'): 208 command = "{} {}".format(os.environ['QEMU_SYS'], command) 209 if print_command: 210 log("> " + command) 211 popen = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=param_shell) 212 stdout_lines, stderr_lines = popen.communicate() 213 stderr_lines = stderr_lines.decode("utf-8") 214 stdout_lines = stdout_lines.decode("utf-8") 215 if print_output: 216 if stdout_lines: 217 print(stdout_lines) 218 if stderr_lines: 219 print(stderr_lines) 220 if popen.returncode is not None and popen.returncode != 0: 221 if stderr_lines and not print_output and print_error: 222 print(stderr_lines) 223 errout("Failed to run: {}\n".format(command, stdout_lines + stderr_lines)) 224 return (stdout_lines + stderr_lines).splitlines() 225 226 227def cleanup(silent=False): 228 for f in glob.glob("{}/test_list*".format(TEMP)): 229 if not silent: 230 log("Deleting {}".format(f)) 231 os.unlink(f) 232 233 234def datagen(file_name, size): 235 non_sparse_size = size // 2 236 sparse_size = size - non_sparse_size 237 with open(file_name, "wb") as f: 238 f.seek(sparse_size) 239 f.write(os.urandom(non_sparse_size)) 240 241 242def generate_files(): 243 # file format ~ test_list<frametype>-<no_frames>f<create-args>.lz4 ~ 244 # Generate LZ4Frames 245 for i in SIZES: 246 filename = "{}/test_list_{}M".format(TEMP, i) 247 log("Generating {}".format(filename)) 248 datagen(filename, i * MIB) 249 for j in ["--content-size", "-BI", "-BD", "-BX", "--no-frame-crc"]: 250 lz4file = "{}-lz4f-1f{}.lz4".format(filename, j) 251 execute("{} {} {} {}".format(LZ4, j, filename, lz4file)) 252 # Generate skippable frames 253 lz4file = "{}-skip-1f.lz4".format(filename) 254 skipsize = i * 1024 255 skipbytes = bytes([80, 42, 77, 24]) + skipsize.to_bytes(4, byteorder='little', signed=False) 256 with open(lz4file, 'wb') as f: 257 f.write(skipbytes) 258 f.write(os.urandom(skipsize)) 259 # Generate legacy frames 260 lz4file = "{}-legc-1f.lz4".format(filename) 261 execute("{} -l {} {}".format(LZ4, filename, lz4file)) 262 263 # Concatenate --content-size files 264 file_list = glob.glob("{}/test_list_*-lz4f-1f--content-size.lz4".format(TEMP)) 265 with open("{}/test_list_{}M-lz4f-2f--content-size.lz4".format(TEMP, sum(SIZES)), 'ab') as outfile: 266 for fname in file_list: 267 with open(fname, 'rb') as infile: 268 outfile.write(infile.read()) 269 270 # Concatenate all files 271 file_list = glob.glob("{}/test_list_*.lz4".format(TEMP)) 272 with open("{}/test_list_concat-all.lz4".format(TEMP), 'ab') as outfile: 273 for fname in file_list: 274 with open(fname, 'rb') as infile: 275 outfile.write(infile.read()) 276 277 278if __name__ == '__main__': 279 cleanup() 280 generate_files() 281 unittest.main(verbosity=2, exit=False) 282 cleanup(silent=True) 283