• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#! /usr/bin/env python3
2import subprocess
3import time
4import glob
5import os
6import tempfile
7import unittest
8
9SIZES = [3, 11]  # Always 2 sizes
10MIB = 1048576
11LZ4 = os.path.dirname(os.path.realpath(__file__)) + "/../lz4"
12if not os.path.exists(LZ4):
13    LZ4 = os.path.dirname(os.path.realpath(__file__)) + "/../programs/lz4"
14TEMP = tempfile.gettempdir()
15
16
17class NVerboseFileInfo(object):
18    def __init__(self, line_in):
19        self.line = line_in
20        splitlines = line_in.split()
21        if len(splitlines) != 7:
22            errout("Unexpected line: {}".format(line_in))
23        self.frames, self.type, self.block, self.compressed, self.uncompressed, self.ratio, self.filename = splitlines
24        self.exp_unc_size = 0
25        # Get real file sizes
26        if "concat-all" in self.filename or "2f--content-size" in self.filename:
27            for i in SIZES:
28                self.exp_unc_size += os.path.getsize("{}/test_list_{}M".format(TEMP, i))
29        else:
30            uncompressed_filename = self.filename.split("-")[0]
31            self.exp_unc_size += os.path.getsize("{}/{}".format(TEMP, uncompressed_filename))
32        self.exp_comp_size = os.path.getsize("{}/{}".format(TEMP, self.filename))
33
34
35class TestNonVerbose(unittest.TestCase):
36    @classmethod
37    def setUpClass(self):
38        self.nvinfo_list = []
39        for i, line in enumerate(execute("{} --list -m {}/test_list_*.lz4".format(LZ4, TEMP), print_output=True)):
40            if i > 0:
41                self.nvinfo_list.append(NVerboseFileInfo(line))
42
43    def test_frames(self):
44        all_concat_frames = 0
45        all_concat_index = None
46        for i, nvinfo in enumerate(self.nvinfo_list):
47            if "concat-all" in nvinfo.filename:
48                all_concat_index = i
49            elif "2f--content-size" in nvinfo.filename:
50                self.assertEqual("2", nvinfo.frames, nvinfo.line)
51                all_concat_frames += 2
52            else:
53                self.assertEqual("1", nvinfo.frames, nvinfo.line)
54                all_concat_frames += 1
55        self.assertNotEqual(None, all_concat_index, "Couldn't find concat-all file index.")
56        self.assertEqual(self.nvinfo_list[all_concat_index].frames, str(all_concat_frames), self.nvinfo_list[all_concat_index].line)
57
58    def test_frame_types(self):
59        for nvinfo in self.nvinfo_list:
60            if "-lz4f-" in nvinfo.filename:
61                self.assertEqual(nvinfo.type, "LZ4Frame", nvinfo.line)
62            elif "-legc-" in nvinfo.filename:
63                self.assertEqual(nvinfo.type, "LegacyFrame", nvinfo.line)
64            elif "-skip-" in nvinfo.filename:
65                self.assertEqual(nvinfo.type, "SkippableFrame", nvinfo.line)
66
67    def test_block(self):
68        for nvinfo in self.nvinfo_list:
69            # if "-leg" in nvinfo.filename or "-skip" in nvinfo.filename:
70            #     self.assertEqual(nvinfo.block, "-", nvinfo.line)
71            if "--BD" in nvinfo.filename:
72                self.assertRegex(nvinfo.block, "^B[0-9]+D$", nvinfo.line)
73            elif "--BI" in nvinfo.filename:
74                self.assertRegex(nvinfo.block, "^B[0-9]+I$", nvinfo.line)
75
76    def test_compressed_size(self):
77        for nvinfo in self.nvinfo_list:
78            self.assertEqual(nvinfo.compressed, to_human(nvinfo.exp_comp_size), nvinfo.line)
79
80    def test_ratio(self):
81        for nvinfo in self.nvinfo_list:
82            if "--content-size" in nvinfo.filename:
83                self.assertEqual(nvinfo.ratio, "{:.2f}%".format(float(nvinfo.exp_comp_size) / float(nvinfo.exp_unc_size) * 100), nvinfo.line)
84
85    def test_uncompressed_size(self):
86        for nvinfo in self.nvinfo_list:
87            if "--content-size" in nvinfo.filename:
88                self.assertEqual(nvinfo.uncompressed, to_human(nvinfo.exp_unc_size), nvinfo.line)
89
90
91class VerboseFileInfo(object):
92    def __init__(self, lines):
93        # Parse lines
94        self.frame_list = []
95        self.file_frame_map = []
96        for i, line in enumerate(lines):
97            if i == 0:
98                self.filename = line
99                continue
100            elif i == 1:
101                # Skip header
102                continue
103            frame_info = dict(zip(["frame", "type", "block", "checksum", "compressed", "uncompressed", "ratio"], line.split()))
104            frame_info["line"] = line
105            self.frame_list.append(frame_info)
106
107
108class TestVerbose(unittest.TestCase):
109    @classmethod
110    def setUpClass(self):
111        # Even do we're listing 2 files to test multiline working as expected.
112        # we're only really interested in testing the output of the concat-all file.
113        self.vinfo_list = []
114        start = end = 0
115        output = execute("{} --list -m -v {}/test_list_concat-all.lz4 {}/test_list_*M-lz4f-2f--content-size.lz4".format(LZ4, TEMP, TEMP), print_output=True)
116        for i, line in enumerate(output):
117            if line.startswith("test_list"):
118                if start != 0 and end != 0:
119                    self.vinfo_list.append(VerboseFileInfo(output[start:end]))
120                start = i
121            if not line:
122                end = i
123        self.vinfo_list.append(VerboseFileInfo(output[start:end]))
124        # Populate file_frame_map as a reference of the expected info
125        concat_file_list = glob.glob("/tmp/test_list_[!concat]*.lz4")
126        # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file
127        for i, filename in enumerate(concat_file_list):
128            if "2f--content-size" in filename:
129                concat_file_list.insert(i, filename)
130                break
131        self.cvinfo = self.vinfo_list[0]
132        self.cvinfo.file_frame_map = concat_file_list
133        self.cvinfo.compressed_size = os.path.getsize("{}/test_list_concat-all.lz4".format(TEMP))
134
135    def test_filename(self):
136        for i, vinfo in enumerate(self.vinfo_list):
137            self.assertRegex(vinfo.filename, "^test_list_.*({}/{})".format(i + 1, len(self.vinfo_list)))
138
139    def test_frame_number(self):
140        for vinfo in self.vinfo_list:
141            for i, frame_info in enumerate(vinfo.frame_list):
142                self.assertEqual(frame_info["frame"], str(i + 1), frame_info["line"])
143
144    def test_frame_type(self):
145        for i, frame_info in enumerate(self.cvinfo.frame_list):
146            if "-lz4f-" in self.cvinfo.file_frame_map[i]:
147                self.assertEqual(self.cvinfo.frame_list[i]["type"], "LZ4Frame", self.cvinfo.frame_list[i]["line"])
148            elif "-legc-" in self.cvinfo.file_frame_map[i]:
149                self.assertEqual(self.cvinfo.frame_list[i]["type"], "LegacyFrame", self.cvinfo.frame_list[i]["line"])
150            elif "-skip-" in self.cvinfo.file_frame_map[i]:
151                self.assertEqual(self.cvinfo.frame_list[i]["type"], "SkippableFrame", self.cvinfo.frame_list[i]["line"])
152
153    def test_block(self):
154        for i, frame_info in enumerate(self.cvinfo.frame_list):
155            if "--BD" in self.cvinfo.file_frame_map[i]:
156                self.assertRegex(self.cvinfo.frame_list[i]["block"], "^B[0-9]+D$", self.cvinfo.frame_list[i]["line"])
157            elif "--BI" in self.cvinfo.file_frame_map[i]:
158                self.assertEqual(self.cvinfo.frame_list[i]["block"], "^B[0-9]+I$", self.cvinfo.frame_list[i]["line"])
159
160    def test_checksum(self):
161        for i, frame_info in enumerate(self.cvinfo.frame_list):
162            if "-lz4f-" in self.cvinfo.file_frame_map[i] and "--no-frame-crc" not in self.cvinfo.file_frame_map[i]:
163                self.assertEqual(self.cvinfo.frame_list[i]["checksum"], "XXH32", self.cvinfo.frame_list[i]["line"])
164
165    def test_compressed(self):
166        total = 0
167        for i, frame_info in enumerate(self.cvinfo.frame_list):
168            if "-2f-" not in self.cvinfo.file_frame_map[i]:
169                expected_size = os.path.getsize(self.cvinfo.file_frame_map[i])
170                self.assertEqual(self.cvinfo.frame_list[i]["compressed"], str(expected_size), self.cvinfo.frame_list[i]["line"])
171            total += int(self.cvinfo.frame_list[i]["compressed"])
172        self.assertEqual(total, self.cvinfo.compressed_size, "Expected total sum ({}) to match {} filesize".format(total, self.cvinfo.filename))
173
174    def test_uncompressed(self):
175        for i, frame_info in enumerate(self.cvinfo.frame_list):
176            ffm = self.cvinfo.file_frame_map[i]
177            if "-2f-" not in ffm and "--content-size" in ffm:
178                expected_size_unc = int(ffm[ffm.rindex("_") + 1:ffm.index("M")]) * 1048576
179                self.assertEqual(self.cvinfo.frame_list[i]["uncompressed"], str(expected_size_unc), self.cvinfo.frame_list[i]["line"])
180
181    def test_ratio(self):
182        for i, frame_info in enumerate(self.cvinfo.frame_list):
183            if "--content-size" in self.cvinfo.file_frame_map[i]:
184                self.assertEqual(self.cvinfo.frame_list[i]['ratio'],
185                                 "{:.2f}%".format(float(self.cvinfo.frame_list[i]['compressed']) / float(self.cvinfo.frame_list[i]['uncompressed']) * 100),
186                                 self.cvinfo.frame_list[i]["line"])
187
188
189def to_human(size):
190    for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']:
191        if size < 1024.0:
192            break
193        size /= 1024.0
194    return "{:.2f}{}".format(size, unit)
195
196
197def log(text):
198    print(time.strftime("%Y/%m/%d %H:%M:%S") + ' - ' + text)
199
200
201def errout(text, err=1):
202    log(text)
203    exit(err)
204
205
206def execute(command, print_command=True, print_output=False, print_error=True, param_shell=True):
207    if os.environ.get('QEMU_SYS'):
208        command = "{} {}".format(os.environ['QEMU_SYS'], command)
209    if print_command:
210        log("> " + command)
211    popen = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=param_shell)
212    stdout_lines, stderr_lines = popen.communicate()
213    stderr_lines = stderr_lines.decode("utf-8")
214    stdout_lines = stdout_lines.decode("utf-8")
215    if print_output:
216        if stdout_lines:
217            print(stdout_lines)
218        if stderr_lines:
219            print(stderr_lines)
220    if popen.returncode is not None and popen.returncode != 0:
221        if stderr_lines and not print_output and print_error:
222            print(stderr_lines)
223        errout("Failed to run: {}\n".format(command, stdout_lines + stderr_lines))
224    return (stdout_lines + stderr_lines).splitlines()
225
226
227def cleanup(silent=False):
228    for f in glob.glob("{}/test_list*".format(TEMP)):
229        if not silent:
230            log("Deleting {}".format(f))
231        os.unlink(f)
232
233
234def datagen(file_name, size):
235    non_sparse_size = size // 2
236    sparse_size = size - non_sparse_size
237    with open(file_name, "wb") as f:
238        f.seek(sparse_size)
239        f.write(os.urandom(non_sparse_size))
240
241
242def generate_files():
243    # file format  ~ test_list<frametype>-<no_frames>f<create-args>.lz4 ~
244    # Generate LZ4Frames
245    for i in SIZES:
246        filename = "{}/test_list_{}M".format(TEMP, i)
247        log("Generating {}".format(filename))
248        datagen(filename, i * MIB)
249        for j in ["--content-size", "-BI", "-BD", "-BX", "--no-frame-crc"]:
250            lz4file = "{}-lz4f-1f{}.lz4".format(filename, j)
251            execute("{} {} {} {}".format(LZ4, j, filename, lz4file))
252        # Generate skippable frames
253        lz4file = "{}-skip-1f.lz4".format(filename)
254        skipsize = i * 1024
255        skipbytes = bytes([80, 42, 77, 24]) + skipsize.to_bytes(4, byteorder='little', signed=False)
256        with open(lz4file, 'wb') as f:
257            f.write(skipbytes)
258            f.write(os.urandom(skipsize))
259        # Generate legacy frames
260        lz4file = "{}-legc-1f.lz4".format(filename)
261        execute("{} -l {} {}".format(LZ4, filename, lz4file))
262
263    # Concatenate --content-size files
264    file_list = glob.glob("{}/test_list_*-lz4f-1f--content-size.lz4".format(TEMP))
265    with open("{}/test_list_{}M-lz4f-2f--content-size.lz4".format(TEMP, sum(SIZES)), 'ab') as outfile:
266        for fname in file_list:
267            with open(fname, 'rb') as infile:
268                outfile.write(infile.read())
269
270    # Concatenate all files
271    file_list = glob.glob("{}/test_list_*.lz4".format(TEMP))
272    with open("{}/test_list_concat-all.lz4".format(TEMP), 'ab') as outfile:
273        for fname in file_list:
274            with open(fname, 'rb') as infile:
275                outfile.write(infile.read())
276
277
278if __name__ == '__main__':
279    cleanup()
280    generate_files()
281    unittest.main(verbosity=2, exit=False)
282    cleanup(silent=True)
283