1# Lint as: python2, python3 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import os 11import shutil 12import tempfile 13import time 14import unittest 15 16import common 17from autotest_lib.client.bin.result_tools import dedupe_file_throttler 18from autotest_lib.client.bin.result_tools import result_info 19from autotest_lib.client.bin.result_tools import unittest_lib 20from six.moves import range 21 22 23# Set to 0 to force maximum throttling. 24MAX_RESULT_SIZE_KB = 0 25 26class DedupeFileThrottleTest(unittest.TestCase): 27 """Test class for dedupe_file_throttler.throttle method.""" 28 29 def setUp(self): 30 """Setup directory for test.""" 31 self.test_dir = tempfile.mkdtemp() 32 self.files_to_keep = [] 33 self.files_to_delete = [] 34 modification_time = int(time.time()) - 1000 35 36 # Files to be deduped in the root directory of result dir. 37 for i in range(6): 38 file_name = 'file_%d' % i 39 f = os.path.join(self.test_dir, file_name) 40 unittest_lib.create_file(f, unittest_lib.SIZE) 41 os.utime(f, (modification_time, modification_time)) 42 modification_time += 1 43 if (i < dedupe_file_throttler.OLDEST_FILES_TO_KEEP_COUNT or 44 i >= 6 - dedupe_file_throttler.NEWEST_FILES_TO_KEEP_COUNT): 45 self.files_to_keep.append(f) 46 else: 47 self.files_to_delete.append(f) 48 49 folder1 = os.path.join(self.test_dir, 'folder1') 50 os.mkdir(folder1) 51 52 # Files should not be deduped. 53 for i in range(3): 54 file_name = 'file_not_dedupe_%d' % i 55 f = os.path.join(folder1, file_name) 56 unittest_lib.create_file(f, unittest_lib.SIZE) 57 self.files_to_keep.append(f) 58 59 # Files to be deduped in the sub directory of result dir. 60 for i in range(10): 61 file_name = 'file_in_sub_dir%d' % i 62 f = os.path.join(folder1, file_name) 63 unittest_lib.create_file(f, unittest_lib.SIZE) 64 os.utime(f, (modification_time, modification_time)) 65 modification_time += 1 66 if (i < dedupe_file_throttler.OLDEST_FILES_TO_KEEP_COUNT or 67 i >= 10 - dedupe_file_throttler.NEWEST_FILES_TO_KEEP_COUNT): 68 self.files_to_keep.append(f) 69 else: 70 self.files_to_delete.append(f) 71 72 def tearDown(self): 73 """Cleanup the test directory.""" 74 shutil.rmtree(self.test_dir, ignore_errors=True) 75 76 def testTrim(self): 77 """Test throttle method.""" 78 summary = result_info.ResultInfo.build_from_path(self.test_dir) 79 dedupe_file_throttler.throttle( 80 summary, max_result_size_KB=MAX_RESULT_SIZE_KB) 81 82 # Verify summary sizes are updated. 83 self.assertEqual(19 * unittest_lib.SIZE, summary.original_size) 84 self.assertEqual(9 * unittest_lib.SIZE, summary.trimmed_size) 85 86 # Verify files that should not be deleted still exists. 87 for f in self.files_to_keep: 88 self.assertTrue(os.stat(f).st_size > 0, 89 'File %s should not be deleted!' % f) 90 91 # Verify files that should be deleted no longer exists. 92 for f in self.files_to_delete: 93 self.assertFalse(os.path.exists(f), 'File %s is not deleted!' % f) 94 95 96# this is so the test can be run in standalone mode 97if __name__ == '__main__': 98 """Main""" 99 unittest.main() 100