• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2import subprocess
3import time
4import glob
5import os
6import tempfile
7import unittest
8import sys
9
10SIZES = [3, 11]  # Always 2 sizes
11MIB = 1048576
12LZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../lz4")
13if not os.path.exists(LZ4):
14    LZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../programs/lz4")
15TEMP = tempfile.gettempdir()
16
17
18class NVerboseFileInfo(object):
19    def __init__(self, line_in):
20        self.line = line_in
21        splitlines = line_in.split()
22        if len(splitlines) != 7:
23            errout(f"Unexpected line: {line_in}")
24        self.frames, self.type, self.block, self.compressed, self.uncompressed, self.ratio, self.filename = splitlines
25        self.exp_unc_size = 0
26        # Get real file sizes
27        if "concat-all" in self.filename or "2f--content-size" in self.filename:
28            for i in SIZES:
29                self.exp_unc_size += os.path.getsize(f"{TEMP}/test_list_{i}M")
30        else:
31            uncompressed_filename = self.filename.split("-")[0]
32            self.exp_unc_size += os.path.getsize(f"{TEMP}/{uncompressed_filename}")
33        self.exp_comp_size = os.path.getsize(f"{TEMP}/{self.filename}")
34
35
36class TestNonVerbose(unittest.TestCase):
37    @classmethod
38    def setUpClass(self):
39        self.nvinfo_list = []
40        test_list_files = glob.glob(f"{TEMP}/test_list_*.lz4")
41        # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file
42        for i, filename in enumerate(test_list_files):
43            for i, line in enumerate(execute(f"{LZ4} --list -m {filename}", print_output=True)):
44                if i > 0:
45                    self.nvinfo_list.append(NVerboseFileInfo(line))
46
47    def test_frames(self):
48        all_concat_frames = 0
49        all_concat_index = None
50        for i, nvinfo in enumerate(self.nvinfo_list):
51            if "concat-all" in nvinfo.filename:
52                all_concat_index = i
53            elif "2f--content-size" in nvinfo.filename:
54                self.assertEqual("2", nvinfo.frames, nvinfo.line)
55                all_concat_frames += 2
56            else:
57                self.assertEqual("1", nvinfo.frames, nvinfo.line)
58                all_concat_frames += 1
59        self.assertNotEqual(None, all_concat_index, "Couldn't find concat-all file index.")
60        self.assertEqual(self.nvinfo_list[all_concat_index].frames, str(all_concat_frames), self.nvinfo_list[all_concat_index].line)
61
62    def test_frame_types(self):
63        for nvinfo in self.nvinfo_list:
64            if "-lz4f-" in nvinfo.filename:
65                self.assertEqual(nvinfo.type, "LZ4Frame", nvinfo.line)
66            elif "-legc-" in nvinfo.filename:
67                self.assertEqual(nvinfo.type, "LegacyFrame", nvinfo.line)
68            elif "-skip-" in nvinfo.filename:
69                self.assertEqual(nvinfo.type, "SkippableFrame", nvinfo.line)
70
71    def test_block(self):
72        for nvinfo in self.nvinfo_list:
73            # if "-leg" in nvinfo.filename or "-skip" in nvinfo.filename:
74            #     self.assertEqual(nvinfo.block, "-", nvinfo.line)
75            if "--BD" in nvinfo.filename:
76                self.assertRegex(nvinfo.block, "^B[0-9]+D$", nvinfo.line)
77            elif "--BI" in nvinfo.filename:
78                self.assertRegex(nvinfo.block, "^B[0-9]+I$", nvinfo.line)
79
80    def test_compressed_size(self):
81        for nvinfo in self.nvinfo_list:
82            self.assertEqual(nvinfo.compressed, to_human(nvinfo.exp_comp_size), nvinfo.line)
83
84    def test_ratio(self):
85        for nvinfo in self.nvinfo_list:
86            if "--content-size" in nvinfo.filename:
87                self.assertEqual(nvinfo.ratio, f"{float(nvinfo.exp_comp_size) / float(nvinfo.exp_unc_size) * 100:.2f}%", nvinfo.line)
88
89    def test_uncompressed_size(self):
90        for nvinfo in self.nvinfo_list:
91            if "--content-size" in nvinfo.filename:
92                self.assertEqual(nvinfo.uncompressed, to_human(nvinfo.exp_unc_size), nvinfo.line)
93
94
95class VerboseFileInfo(object):
96    def __init__(self, lines):
97        # Parse lines
98        self.frame_list = []
99        self.file_frame_map = []
100        for i, line in enumerate(lines):
101            if i == 0:
102                self.filename = line
103                continue
104            elif i == 1:
105                # Skip header
106                continue
107            frame_info = dict(zip(["frame", "type", "block", "checksum", "compressed", "uncompressed", "ratio"], line.split()))
108            frame_info["line"] = line
109            self.frame_list.append(frame_info)
110
111
112class TestVerbose(unittest.TestCase):
113    @classmethod
114    def setUpClass(self):
115        # Even do we're listing 2 files to test multiline working as expected.
116        # we're only really interested in testing the output of the concat-all file.
117        self.vinfo_list = []
118        start = end = 0
119        test_list_SM_lz4f = glob.glob(f"{TEMP}/test_list_*M-lz4f-2f--content-size.lz4")
120        for i, filename in enumerate(test_list_SM_lz4f):
121            output = execute(f"{LZ4} --list -m -v {TEMP}/test_list_concat-all.lz4 {filename}", print_output=True)
122            for i, line in enumerate(output):
123                if line.startswith("test_list"):
124                    if start != 0 and end != 0:
125                        self.vinfo_list.append(VerboseFileInfo(output[start:end]))
126                    start = i
127                if not line:
128                    end = i
129        self.vinfo_list.append(VerboseFileInfo(output[start:end]))
130        # Populate file_frame_map as a reference of the expected info
131        concat_file_list = glob.glob(f"{TEMP}/test_list_[!concat]*.lz4")
132        # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file
133        for i, filename in enumerate(concat_file_list):
134            if "2f--content-size" in filename:
135                concat_file_list.insert(i, filename)
136                break
137        self.cvinfo = self.vinfo_list[0]
138        self.cvinfo.file_frame_map = concat_file_list
139        self.cvinfo.compressed_size = os.path.getsize(f"{TEMP}/test_list_concat-all.lz4")
140
141    def test_filename(self):
142        for i, vinfo in enumerate(self.vinfo_list):
143            self.assertRegex(vinfo.filename, f"^test_list_.*({i + 1}/{len(self.vinfo_list)})".format(i + 1, len(self.vinfo_list)))
144
145    def test_frame_number(self):
146        for vinfo in self.vinfo_list:
147            for i, frame_info in enumerate(vinfo.frame_list):
148                self.assertEqual(frame_info["frame"], str(i + 1), frame_info["line"])
149
150    def test_frame_type(self):
151        for i, frame_info in enumerate(self.cvinfo.frame_list):
152            if "-lz4f-" in self.cvinfo.file_frame_map[i]:
153                self.assertEqual(self.cvinfo.frame_list[i]["type"], "LZ4Frame", self.cvinfo.frame_list[i]["line"])
154            elif "-legc-" in self.cvinfo.file_frame_map[i]:
155                self.assertEqual(self.cvinfo.frame_list[i]["type"], "LegacyFrame", self.cvinfo.frame_list[i]["line"])
156            elif "-skip-" in self.cvinfo.file_frame_map[i]:
157                self.assertEqual(self.cvinfo.frame_list[i]["type"], "SkippableFrame", self.cvinfo.frame_list[i]["line"])
158
159    def test_block(self):
160        for i, frame_info in enumerate(self.cvinfo.frame_list):
161            if "--BD" in self.cvinfo.file_frame_map[i]:
162                self.assertRegex(self.cvinfo.frame_list[i]["block"], "^B[0-9]+D$", self.cvinfo.frame_list[i]["line"])
163            elif "--BI" in self.cvinfo.file_frame_map[i]:
164                self.assertEqual(self.cvinfo.frame_list[i]["block"], "^B[0-9]+I$", self.cvinfo.frame_list[i]["line"])
165
166    def test_checksum(self):
167        for i, frame_info in enumerate(self.cvinfo.frame_list):
168            if "-lz4f-" in self.cvinfo.file_frame_map[i] and "--no-frame-crc" not in self.cvinfo.file_frame_map[i]:
169                self.assertEqual(self.cvinfo.frame_list[i]["checksum"], "XXH32", self.cvinfo.frame_list[i]["line"])
170
171    def test_compressed(self):
172        total = 0
173        for i, frame_info in enumerate(self.cvinfo.frame_list):
174            if "-2f-" not in self.cvinfo.file_frame_map[i]:
175                expected_size = os.path.getsize(self.cvinfo.file_frame_map[i])
176                self.assertEqual(self.cvinfo.frame_list[i]["compressed"], str(expected_size), self.cvinfo.frame_list[i]["line"])
177            total += int(self.cvinfo.frame_list[i]["compressed"])
178        self.assertEqual(total, self.cvinfo.compressed_size, f"Expected total sum ({total}) to match {self.cvinfo.filename} filesize")
179
180    def test_uncompressed(self):
181        for i, frame_info in enumerate(self.cvinfo.frame_list):
182            ffm = self.cvinfo.file_frame_map[i]
183            if "-2f-" not in ffm and "--content-size" in ffm:
184                expected_size_unc = int(ffm[ffm.rindex("_") + 1:ffm.index("M")]) * 1048576
185                self.assertEqual(self.cvinfo.frame_list[i]["uncompressed"], str(expected_size_unc), self.cvinfo.frame_list[i]["line"])
186
187    def test_ratio(self):
188        for i, frame_info in enumerate(self.cvinfo.frame_list):
189            if "--content-size" in self.cvinfo.file_frame_map[i]:
190                self.assertEqual(self.cvinfo.frame_list[i]['ratio'],
191                                 f"{float(self.cvinfo.frame_list[i]['compressed']) / float(self.cvinfo.frame_list[i]['uncompressed']) * 100:.2f}%",
192                                 self.cvinfo.frame_list[i]["line"])
193
194
195def to_human(size):
196    for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']:
197        if size < 1024.0:
198            break
199        size /= 1024.0
200    return f"{size:.2f}{unit}"
201
202
203def log(text):
204    print(time.strftime("%Y/%m/%d %H:%M:%S") + ' - ' + text)
205
206
207def errout(text, err=1):
208    log(text)
209    exit(err)
210
211
212def execute(command, print_command=True, print_output=False, print_error=True):
213    if os.environ.get('QEMU_SYS'):
214        command = f"{os.environ['QEMU_SYS']} {command}"
215    if print_command:
216        log("> " + command)
217    popen = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
218    stdout_lines, stderr_lines = popen.communicate()
219    stderr_lines = stderr_lines.decode("utf-8")
220    stdout_lines = stdout_lines.decode("utf-8")
221    if print_output:
222        if stdout_lines:
223            print(stdout_lines)
224        if stderr_lines:
225            print(stderr_lines)
226    if popen.returncode is not None and popen.returncode != 0:
227        if stderr_lines and not print_output and print_error:
228            print(stderr_lines)
229        errout(f"Failed to run: {command}, {stdout_lines + stderr_lines}\n")
230    return (stdout_lines + stderr_lines).splitlines()
231
232
233def cleanup(silent=False):
234    for f in glob.glob(f"{TEMP}/test_list*"):
235        if not silent:
236            log(f"Deleting {f}")
237        os.unlink(f)
238
239
240def datagen(file_name, size):
241    non_sparse_size = size // 2
242    sparse_size = size - non_sparse_size
243    with open(file_name, "wb") as f:
244        f.seek(sparse_size)
245        f.write(os.urandom(non_sparse_size))
246
247
248def generate_files():
249    # file format  ~ test_list<frametype>-<no_frames>f<create-args>.lz4 ~
250    # Generate LZ4Frames
251    for i in SIZES:
252        filename = f"{TEMP}/test_list_{i}M"
253        log(f"Generating {filename}")
254        datagen(filename, i * MIB)
255        for j in ["--content-size", "-BI", "-BD", "-BX", "--no-frame-crc"]:
256            lz4file = f"{filename}-lz4f-1f{j}.lz4"
257            execute(f"{LZ4} {j} {filename} {lz4file}")
258        # Generate skippable frames
259        lz4file = f"{filename}-skip-1f.lz4"
260        skipsize = i * 1024
261        skipbytes = bytes([80, 42, 77, 24]) + skipsize.to_bytes(4, byteorder='little', signed=False)
262        with open(lz4file, 'wb') as f:
263            f.write(skipbytes)
264            f.write(os.urandom(skipsize))
265        # Generate legacy frames
266        lz4file = f"{filename}-legc-1f.lz4"
267        execute(f"{LZ4} -l {filename} {lz4file}")
268
269    # Concatenate --content-size files
270    file_list = glob.glob(f"{TEMP}/test_list_*-lz4f-1f--content-size.lz4")
271    with open(f"{TEMP}/test_list_{sum(SIZES)}M-lz4f-2f--content-size.lz4", 'ab') as outfile:
272        for fname in file_list:
273            with open(fname, 'rb') as infile:
274                outfile.write(infile.read())
275
276    # Concatenate all files
277    file_list = glob.glob(f"{TEMP}/test_list_*.lz4")
278    with open(f"{TEMP}/test_list_concat-all.lz4", 'ab') as outfile:
279        for fname in file_list:
280            with open(fname, 'rb') as infile:
281                outfile.write(infile.read())
282
283
284if __name__ == '__main__':
285    cleanup()
286    generate_files()
287    ret = unittest.main(verbosity=2, exit=False)
288    cleanup(silent=True)
289    sys.exit(not ret.result.wasSuccessful())
290