1#!/usr/bin/python 2 3"""Tests for site_sysinfo.""" 4 5__author__ = 'dshi@google.com (Dan Shi)' 6 7import cPickle as pickle 8import filecmp 9import os 10import random 11import shutil 12import tempfile 13import unittest 14 15import common 16from autotest_lib.client.bin import site_sysinfo 17from autotest_lib.client.common_lib import autotemp 18 19 20class diffable_logdir_test(unittest.TestCase): 21 """Tests for methods in class diffable_logdir.""" 22 23 24 def setUp(self): 25 """Initialize a temp direcotry with test files.""" 26 self.tempdir = autotemp.tempdir(unique_id='diffable_logdir') 27 self.src_dir = os.path.join(self.tempdir.name, 'src') 28 self.dest_dir = os.path.join(self.tempdir.name, 'dest') 29 30 self.existing_files = ['existing_file_'+str(i) for i in range(3)] 31 self.existing_files_folder = ['', 'sub', 'sub/sub2'] 32 self.existing_files_path = [os.path.join(self.src_dir, folder, f) 33 for f,folder in zip(self.existing_files, 34 self.existing_files_folder)] 35 self.new_files = ['new_file_'+str(i) for i in range(2)] 36 self.new_files_folder = ['sub', 'sub/sub3'] 37 self.new_files_path = [os.path.join(self.src_dir, folder, f) 38 for f,folder in zip(self.new_files, 39 self.new_files_folder)] 40 41 # Create some file with random data in source directory. 42 for p in self.existing_files_path: 43 self.append_text_to_file(str(random.random()), p) 44 45 self.existing_fifo_path = os.path.join( 46 self.src_dir,'sub/sub2/existing_fifo') 47 os.mkfifo(self.existing_fifo_path) 48 49 50 def tearDown(self): 51 """Clearn up.""" 52 self.tempdir.clean() 53 54 55 def append_text_to_file(self, text, file_path): 56 """Append text to the end of a file, create the file if not existed. 57 58 @param text: text to be appended to a file. 59 @param file_path: path to the file. 60 61 """ 62 dir_name = os.path.dirname(file_path) 63 if not os.path.exists(dir_name): 64 os.makedirs(dir_name) 65 with open(file_path, 'a') as f: 66 f.write(text) 67 68 69 def assert_trees_equal(self, dir1, dir2, ignore=None): 70 """Assert two directory trees contain the same files. 71 72 @param dir1: the left comparison directory. 73 @param dir2: the right comparison directory. 74 @param ignore: filenames to ignore (in any directory). 75 76 """ 77 dircmps = [] 78 dircmps.append(filecmp.dircmp(dir1, dir2, ignore)) 79 while dircmps: 80 dcmp = dircmps.pop() 81 self.assertEqual(dcmp.left_list, dcmp.right_list) 82 self.assertEqual([], dcmp.diff_files) 83 dircmps.extend(dcmp.subdirs.values()) 84 85 86 def test_diffable_logdir_success(self): 87 """Test the diff function to save new data from a directory.""" 88 info = site_sysinfo.diffable_logdir(self.src_dir, 89 keep_file_hierarchy=False, 90 append_diff_in_name=False) 91 # Run the first time to collect file status. 92 info.run(log_dir=None, collect_init_status=True) 93 94 # Add new files to the test directory. 95 for file_name, file_path in zip(self.new_files, 96 self.new_files_path): 97 self.append_text_to_file(file_name, file_path) 98 99 # Temp file for existing_file_2, used to hold on the inode. If the 100 # file is deleted and recreated, its inode might not change. 101 existing_file_2 = self.existing_files_path[2] 102 existing_file_2_tmp = existing_file_2 + '_tmp' 103 os.rename(existing_file_2, existing_file_2_tmp) 104 105 # Append data to existing file. 106 for file_name, file_path in zip(self.existing_files, 107 self.existing_files_path): 108 self.append_text_to_file(file_name, file_path) 109 110 # Remove the tmp file. 111 os.remove(existing_file_2_tmp) 112 113 # Add a new FIFO 114 new_fifo_path = os.path.join(self.src_dir, 'sub/sub2/new_fifo') 115 os.mkfifo(new_fifo_path) 116 117 # Run the second time to do diff. 118 info.run(self.dest_dir, collect_init_status=False, collect_all=True) 119 120 # Validate files in dest_dir. 121 for file_name, file_path in zip(self.existing_files+self.new_files, 122 self.existing_files_path+self.new_files_path): 123 file_path = file_path.replace('src', 'dest') 124 with open(file_path, 'r') as f: 125 self.assertEqual(file_name, f.read()) 126 127 # Assert that FIFOs are not in the diff. 128 self.assertFalse( 129 os.path.exists(self.existing_fifo_path.replace('src', 'dest')), 130 msg='Existing FIFO present in diff sysinfo') 131 self.assertFalse( 132 os.path.exists(new_fifo_path.replace('src', 'dest')), 133 msg='New FIFO present in diff sysinfo') 134 135 # With collect_all=True, full sysinfo should also be present. 136 full_sysinfo_path = self.dest_dir + self.src_dir 137 self.assertTrue(os.path.exists(full_sysinfo_path), 138 msg='Full sysinfo not present') 139 140 # Assert that the full sysinfo is present. 141 self.assertNotEqual(self.src_dir, full_sysinfo_path) 142 self.assert_trees_equal(self.src_dir, full_sysinfo_path) 143 144 145class LogdirTestCase(unittest.TestCase): 146 """Tests logdir.run""" 147 148 def setUp(self): 149 self.tempdir = tempfile.mkdtemp() 150 self.addCleanup(shutil.rmtree, self.tempdir) 151 152 self.from_dir = os.path.join(self.tempdir, 'from') 153 self.to_dir = os.path.join(self.tempdir, 'to') 154 os.mkdir(self.from_dir) 155 os.mkdir(self.to_dir) 156 157 def _destination_path(self, relative_path, from_dir=None): 158 """The expected destination path for a subdir of the source directory""" 159 if from_dir is None: 160 from_dir = self.from_dir 161 return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path)) 162 163 def test_run_recreates_absolute_source_path(self): 164 """When copying files, the absolute path from the source is recreated 165 in the destination folder. 166 """ 167 os.mkdir(os.path.join(self.from_dir, 'fubar')) 168 logdir = site_sysinfo.logdir(self.from_dir) 169 logdir.run(self.to_dir) 170 destination_path= self._destination_path('fubar') 171 self.assertTrue(os.path.exists(destination_path), 172 msg='Failed to copy to %s' % destination_path) 173 174 def test_run_skips_symlinks(self): 175 os.mkdir(os.path.join(self.from_dir, 'real')) 176 os.symlink(os.path.join(self.from_dir, 'real'), 177 os.path.join(self.from_dir, 'symlink')) 178 179 logdir = site_sysinfo.logdir(self.from_dir) 180 logdir.run(self.to_dir) 181 182 destination_path_real = self._destination_path('real') 183 self.assertTrue(os.path.exists(destination_path_real), 184 msg='real directory was not copied to %s' % 185 destination_path_real) 186 destination_path_link = self._destination_path('symlink') 187 self.assertFalse( 188 os.path.exists(destination_path_link), 189 msg='symlink was copied to %s' % destination_path_link) 190 191 def test_run_resolves_symlinks_to_source_root(self): 192 """run tries hard to get to the source directory before copying. 193 194 Within the source folder, we skip any symlinks, but we first try to 195 resolve symlinks to the source root itself. 196 """ 197 os.mkdir(os.path.join(self.from_dir, 'fubar')) 198 from_symlink = os.path.join(self.tempdir, 'from_symlink') 199 os.symlink(self.from_dir, from_symlink) 200 201 logdir = site_sysinfo.logdir(from_symlink) 202 logdir.run(self.to_dir) 203 204 destination_path = self._destination_path('fubar') 205 self.assertTrue(os.path.exists(destination_path), 206 msg='Failed to copy to %s' % destination_path) 207 208 def test_run_excludes_common_patterns(self): 209 os.mkdir(os.path.join(self.from_dir, 'autoserv2344')) 210 # Create empty file. 211 open(os.path.join(self.from_dir, 'system.journal'), 'w').close() 212 deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix') 213 os.makedirs(os.path.join(self.from_dir, deeper_subdir)) 214 215 logdir = site_sysinfo.logdir(self.from_dir) 216 logdir.run(self.to_dir) 217 218 destination_path = self._destination_path('autoserv2344') 219 self.assertFalse(os.path.exists(destination_path), 220 msg='Copied banned file to %s' % destination_path) 221 destination_path = self._destination_path(deeper_subdir) 222 self.assertFalse(os.path.exists(destination_path), 223 msg='Copied banned file to %s' % destination_path) 224 destination_path = self._destination_path('system.journal') 225 self.assertFalse(os.path.exists(destination_path), 226 msg='Copied banned file to %s' % destination_path) 227 228 def test_run_ignores_exclude_patterns_in_leading_dirs(self): 229 """Exclude patterns should only be applied to path suffixes within 230 from_dir, not to the root directory itself. 231 """ 232 exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344') 233 os.makedirs(os.path.join(exclude_pattern_dir, 'fubar')) 234 logdir = site_sysinfo.logdir(exclude_pattern_dir) 235 logdir.run(self.to_dir) 236 destination_path = self._destination_path('fubar', 237 from_dir=exclude_pattern_dir) 238 self.assertTrue(os.path.exists(destination_path), 239 msg='Failed to copy to %s' % destination_path) 240 241 def test_pickle_unpickle_equal(self): 242 """Sanity check pickle-unpickle round-trip.""" 243 logdir = site_sysinfo.logdir( 244 self.from_dir, 245 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 246 # base_job uses protocol 2 to pickle. We follow suit. 247 logdir_pickle = pickle.dumps(logdir, protocol=2) 248 unpickled_logdir = pickle.loads(logdir_pickle) 249 250 self.assertEqual(unpickled_logdir, logdir) 251 252 def test_pickle_enforce_required_attributes(self): 253 """Some attributes from this object should never be dropped. 254 255 When running a client test against an older build, we pickle the objects 256 of this class from newer version of the class and unpicle to older 257 versions of the class. The older versions require some attributes to be 258 present. 259 """ 260 logdir = site_sysinfo.logdir( 261 self.from_dir, 262 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 263 # base_job uses protocol 2 to pickle. We follow suit. 264 logdir_pickle = pickle.dumps(logdir, protocol=2) 265 logdir = pickle.loads(logdir_pickle) 266 267 self.assertEqual(logdir.additional_exclude, 'a') 268 269 def test_pickle_enforce_required_attributes_default(self): 270 """Some attributes from this object should never be dropped. 271 272 When running a client test against an older build, we pickle the objects 273 of this class from newer version of the class and unpicle to older 274 versions of the class. The older versions require some attributes to be 275 present. 276 """ 277 logdir = site_sysinfo.logdir(self.from_dir) 278 # base_job uses protocol 2 to pickle. We follow suit. 279 logdir_pickle = pickle.dumps(logdir, protocol=2) 280 logdir = pickle.loads(logdir_pickle) 281 282 self.assertEqual(logdir.additional_exclude, None) 283 284 def test_unpickle_handle_missing__excludes(self): 285 """Sanely handle missing _excludes attribute from pickles 286 287 This can happen when running brand new version of this class that 288 introduced this attribute from older server side code in prod. 289 """ 290 logdir = site_sysinfo.logdir(self.from_dir) 291 delattr(logdir, '_excludes') 292 # base_job uses protocol 2 to pickle. We follow suit. 293 logdir_pickle = pickle.dumps(logdir, protocol=2) 294 logdir = pickle.loads(logdir_pickle) 295 296 self.assertEqual(logdir._excludes, 297 site_sysinfo.logdir.DEFAULT_EXCLUDES) 298 299 def test_unpickle_handle_missing__excludes_default(self): 300 """Sanely handle missing _excludes attribute from pickles 301 302 This can happen when running brand new version of this class that 303 introduced this attribute from older server side code in prod. 304 """ 305 logdir = site_sysinfo.logdir( 306 self.from_dir, 307 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 308 delattr(logdir, '_excludes') 309 # base_job uses protocol 2 to pickle. We follow suit. 310 logdir_pickle = pickle.dumps(logdir, protocol=2) 311 logdir = pickle.loads(logdir_pickle) 312 313 self.assertEqual( 314 logdir._excludes, 315 (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 316 317 318if __name__ == '__main__': 319 unittest.main() 320