• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import os
11import shutil
12import tempfile
13import time
14import unittest
15
16import common
17from autotest_lib.client.bin.result_tools import dedupe_file_throttler
18from autotest_lib.client.bin.result_tools import result_info
19from autotest_lib.client.bin.result_tools import unittest_lib
20from six.moves import range
21
22
23# Set to 0 to force maximum throttling.
24MAX_RESULT_SIZE_KB = 0
25
26class DedupeFileThrottleTest(unittest.TestCase):
27    """Test class for dedupe_file_throttler.throttle method."""
28
29    def setUp(self):
30        """Setup directory for test."""
31        self.test_dir = tempfile.mkdtemp()
32        self.files_to_keep = []
33        self.files_to_delete = []
34        modification_time = int(time.time()) - 1000
35
36        # Files to be deduped in the root directory of result dir.
37        for i in range(6):
38            file_name = 'file_%d' % i
39            f = os.path.join(self.test_dir, file_name)
40            unittest_lib.create_file(f, unittest_lib.SIZE)
41            os.utime(f, (modification_time, modification_time))
42            modification_time += 1
43            if (i < dedupe_file_throttler.OLDEST_FILES_TO_KEEP_COUNT or
44                i >= 6 - dedupe_file_throttler.NEWEST_FILES_TO_KEEP_COUNT):
45                self.files_to_keep.append(f)
46            else:
47                self.files_to_delete.append(f)
48
49        folder1 = os.path.join(self.test_dir, 'folder1')
50        os.mkdir(folder1)
51
52        # Files should not be deduped.
53        for i in range(3):
54            file_name = 'file_not_dedupe_%d' % i
55            f = os.path.join(folder1, file_name)
56            unittest_lib.create_file(f, unittest_lib.SIZE)
57            self.files_to_keep.append(f)
58
59        # Files to be deduped in the sub directory of result dir.
60        for i in range(10):
61            file_name = 'file_in_sub_dir%d' % i
62            f = os.path.join(folder1, file_name)
63            unittest_lib.create_file(f, unittest_lib.SIZE)
64            os.utime(f, (modification_time, modification_time))
65            modification_time += 1
66            if (i < dedupe_file_throttler.OLDEST_FILES_TO_KEEP_COUNT or
67                i >= 10 - dedupe_file_throttler.NEWEST_FILES_TO_KEEP_COUNT):
68                self.files_to_keep.append(f)
69            else:
70                self.files_to_delete.append(f)
71
72    def tearDown(self):
73        """Cleanup the test directory."""
74        shutil.rmtree(self.test_dir, ignore_errors=True)
75
76    def testTrim(self):
77        """Test throttle method."""
78        summary = result_info.ResultInfo.build_from_path(self.test_dir)
79        dedupe_file_throttler.throttle(
80                summary, max_result_size_KB=MAX_RESULT_SIZE_KB)
81
82        # Verify summary sizes are updated.
83        self.assertEqual(19 * unittest_lib.SIZE, summary.original_size)
84        self.assertEqual(9 * unittest_lib.SIZE, summary.trimmed_size)
85
86        # Verify files that should not be deleted still exists.
87        for f in self.files_to_keep:
88            self.assertTrue(os.stat(f).st_size > 0,
89                            'File %s should not be deleted!' % f)
90
91        # Verify files that should be deleted no longer exists.
92        for f in self.files_to_delete:
93            self.assertFalse(os.path.exists(f), 'File %s is not deleted!' % f)
94
95
96# this is so the test can be run in standalone mode
97if __name__ == '__main__':
98    """Main"""
99    unittest.main()
100