1#!/usr/bin/python2 2 3"""Tests for site_sysinfo.""" 4 5from __future__ import absolute_import 6from __future__ import division 7from __future__ import print_function 8 9 10__author__ = 'dshi@google.com (Dan Shi)' 11 12import six.moves.cPickle as pickle 13import filecmp 14import os 15import random 16import shutil 17import tempfile 18import unittest 19 20import common 21from autotest_lib.client.bin import site_sysinfo 22from autotest_lib.client.common_lib import autotemp 23from six.moves import range 24from six.moves import zip 25 26 27class diffable_logdir_test(unittest.TestCase): 28 """Tests for methods in class diffable_logdir.""" 29 30 31 def setUp(self): 32 """Initialize a temp direcotry with test files.""" 33 self.tempdir = autotemp.tempdir(unique_id='diffable_logdir') 34 self.src_dir = os.path.join(self.tempdir.name, 'src') 35 self.dest_dir = os.path.join(self.tempdir.name, 'dest') 36 37 self.existing_files = ['existing_file_'+str(i) for i in range(3)] 38 self.existing_files_folder = ['', 'sub', 'sub/sub2'] 39 self.existing_files_path = [os.path.join(self.src_dir, folder, f) 40 for f,folder in zip(self.existing_files, 41 self.existing_files_folder)] 42 self.new_files = ['new_file_'+str(i) for i in range(2)] 43 self.new_files_folder = ['sub', 'sub/sub3'] 44 self.new_files_path = [os.path.join(self.src_dir, folder, f) 45 for f,folder in zip(self.new_files, 46 self.new_files_folder)] 47 48 # Create some file with random data in source directory. 49 for p in self.existing_files_path: 50 self.append_text_to_file(str(random.random()), p) 51 52 self.existing_fifo_path = os.path.join( 53 self.src_dir,'sub/sub2/existing_fifo') 54 os.mkfifo(self.existing_fifo_path) 55 56 57 def tearDown(self): 58 """Clearn up.""" 59 self.tempdir.clean() 60 61 62 def append_text_to_file(self, text, file_path): 63 """Append text to the end of a file, create the file if not existed. 64 65 @param text: text to be appended to a file. 66 @param file_path: path to the file. 67 68 """ 69 dir_name = os.path.dirname(file_path) 70 if not os.path.exists(dir_name): 71 os.makedirs(dir_name) 72 with open(file_path, 'a') as f: 73 f.write(text) 74 75 76 def assert_trees_equal(self, dir1, dir2, ignore=None): 77 """Assert two directory trees contain the same files. 78 79 @param dir1: the left comparison directory. 80 @param dir2: the right comparison directory. 81 @param ignore: filenames to ignore (in any directory). 82 83 """ 84 dircmps = [] 85 dircmps.append(filecmp.dircmp(dir1, dir2, ignore)) 86 while dircmps: 87 dcmp = dircmps.pop() 88 self.assertEqual(dcmp.left_list, dcmp.right_list) 89 self.assertEqual([], dcmp.diff_files) 90 dircmps.extend(dcmp.subdirs.values()) 91 92 93 def test_diffable_logdir_success(self): 94 """Test the diff function to save new data from a directory.""" 95 info = site_sysinfo.diffable_logdir(self.src_dir, 96 keep_file_hierarchy=False, 97 append_diff_in_name=False) 98 # Run the first time to collect file status. 99 info.run(log_dir=None, collect_init_status=True) 100 101 # Add new files to the test directory. 102 for file_name, file_path in zip(self.new_files, 103 self.new_files_path): 104 self.append_text_to_file(file_name, file_path) 105 106 # Temp file for existing_file_2, used to hold on the inode. If the 107 # file is deleted and recreated, its inode might not change. 108 existing_file_2 = self.existing_files_path[2] 109 existing_file_2_tmp = existing_file_2 + '_tmp' 110 os.rename(existing_file_2, existing_file_2_tmp) 111 112 # Append data to existing file. 113 for file_name, file_path in zip(self.existing_files, 114 self.existing_files_path): 115 self.append_text_to_file(file_name, file_path) 116 117 # Remove the tmp file. 118 os.remove(existing_file_2_tmp) 119 120 # Add a new FIFO 121 new_fifo_path = os.path.join(self.src_dir, 'sub/sub2/new_fifo') 122 os.mkfifo(new_fifo_path) 123 124 # Run the second time to do diff. 125 info.run(self.dest_dir, collect_init_status=False, collect_all=True) 126 127 # Validate files in dest_dir. 128 for file_name, file_path in zip(self.existing_files+self.new_files, 129 self.existing_files_path+self.new_files_path): 130 file_path = file_path.replace('src', 'dest') 131 with open(file_path, 'r') as f: 132 self.assertEqual(file_name, f.read()) 133 134 # Assert that FIFOs are not in the diff. 135 self.assertFalse( 136 os.path.exists(self.existing_fifo_path.replace('src', 'dest')), 137 msg='Existing FIFO present in diff sysinfo') 138 self.assertFalse( 139 os.path.exists(new_fifo_path.replace('src', 'dest')), 140 msg='New FIFO present in diff sysinfo') 141 142 # With collect_all=True, full sysinfo should also be present. 143 full_sysinfo_path = self.dest_dir + self.src_dir 144 self.assertTrue(os.path.exists(full_sysinfo_path), 145 msg='Full sysinfo not present') 146 147 # Assert that the full sysinfo is present. 148 self.assertNotEqual(self.src_dir, full_sysinfo_path) 149 self.assert_trees_equal(self.src_dir, full_sysinfo_path) 150 151 152class LogdirTestCase(unittest.TestCase): 153 """Tests logdir.run""" 154 155 def setUp(self): 156 self.tempdir = tempfile.mkdtemp() 157 self.addCleanup(shutil.rmtree, self.tempdir) 158 159 self.from_dir = os.path.join(self.tempdir, 'from') 160 self.to_dir = os.path.join(self.tempdir, 'to') 161 os.mkdir(self.from_dir) 162 os.mkdir(self.to_dir) 163 164 def _destination_path(self, relative_path, from_dir=None): 165 """The expected destination path for a subdir of the source directory""" 166 if from_dir is None: 167 from_dir = self.from_dir 168 return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path)) 169 170 def test_run_recreates_absolute_source_path(self): 171 """When copying files, the absolute path from the source is recreated 172 in the destination folder. 173 """ 174 os.mkdir(os.path.join(self.from_dir, 'fubar')) 175 logdir = site_sysinfo.logdir(self.from_dir) 176 logdir.run(self.to_dir) 177 destination_path= self._destination_path('fubar') 178 self.assertTrue(os.path.exists(destination_path), 179 msg='Failed to copy to %s' % destination_path) 180 181 def test_run_skips_symlinks(self): 182 os.mkdir(os.path.join(self.from_dir, 'real')) 183 os.symlink(os.path.join(self.from_dir, 'real'), 184 os.path.join(self.from_dir, 'symlink')) 185 186 logdir = site_sysinfo.logdir(self.from_dir) 187 logdir.run(self.to_dir) 188 189 destination_path_real = self._destination_path('real') 190 self.assertTrue(os.path.exists(destination_path_real), 191 msg='real directory was not copied to %s' % 192 destination_path_real) 193 destination_path_link = self._destination_path('symlink') 194 self.assertFalse( 195 os.path.exists(destination_path_link), 196 msg='symlink was copied to %s' % destination_path_link) 197 198 def test_run_resolves_symlinks_to_source_root(self): 199 """run tries hard to get to the source directory before copying. 200 201 Within the source folder, we skip any symlinks, but we first try to 202 resolve symlinks to the source root itself. 203 """ 204 os.mkdir(os.path.join(self.from_dir, 'fubar')) 205 from_symlink = os.path.join(self.tempdir, 'from_symlink') 206 os.symlink(self.from_dir, from_symlink) 207 208 logdir = site_sysinfo.logdir(from_symlink) 209 logdir.run(self.to_dir) 210 211 destination_path = self._destination_path('fubar') 212 self.assertTrue(os.path.exists(destination_path), 213 msg='Failed to copy to %s' % destination_path) 214 215 def test_run_excludes_common_patterns(self): 216 os.mkdir(os.path.join(self.from_dir, 'autoserv2344')) 217 # Create empty file. 218 open(os.path.join(self.from_dir, 'system.journal'), 'w').close() 219 deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix') 220 os.makedirs(os.path.join(self.from_dir, deeper_subdir)) 221 222 logdir = site_sysinfo.logdir(self.from_dir) 223 logdir.run(self.to_dir) 224 225 destination_path = self._destination_path('autoserv2344') 226 self.assertFalse(os.path.exists(destination_path), 227 msg='Copied banned file to %s' % destination_path) 228 destination_path = self._destination_path(deeper_subdir) 229 self.assertFalse(os.path.exists(destination_path), 230 msg='Copied banned file to %s' % destination_path) 231 destination_path = self._destination_path('system.journal') 232 self.assertFalse(os.path.exists(destination_path), 233 msg='Copied banned file to %s' % destination_path) 234 235 def test_run_ignores_exclude_patterns_in_leading_dirs(self): 236 """Exclude patterns should only be applied to path suffixes within 237 from_dir, not to the root directory itself. 238 """ 239 exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344') 240 os.makedirs(os.path.join(exclude_pattern_dir, 'fubar')) 241 logdir = site_sysinfo.logdir(exclude_pattern_dir) 242 logdir.run(self.to_dir) 243 destination_path = self._destination_path('fubar', 244 from_dir=exclude_pattern_dir) 245 self.assertTrue(os.path.exists(destination_path), 246 msg='Failed to copy to %s' % destination_path) 247 248 def test_pickle_unpickle_equal(self): 249 """Sanity check pickle-unpickle round-trip.""" 250 logdir = site_sysinfo.logdir( 251 self.from_dir, 252 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 253 # base_job uses protocol 2 to pickle. We follow suit. 254 logdir_pickle = pickle.dumps(logdir, protocol=2) 255 unpickled_logdir = pickle.loads(logdir_pickle) 256 257 self.assertEqual(unpickled_logdir, logdir) 258 259 def test_pickle_enforce_required_attributes(self): 260 """Some attributes from this object should never be dropped. 261 262 When running a client test against an older build, we pickle the objects 263 of this class from newer version of the class and unpicle to older 264 versions of the class. The older versions require some attributes to be 265 present. 266 """ 267 logdir = site_sysinfo.logdir( 268 self.from_dir, 269 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 270 # base_job uses protocol 2 to pickle. We follow suit. 271 logdir_pickle = pickle.dumps(logdir, protocol=2) 272 logdir = pickle.loads(logdir_pickle) 273 274 self.assertEqual(logdir.additional_exclude, 'a') 275 276 def test_pickle_enforce_required_attributes_default(self): 277 """Some attributes from this object should never be dropped. 278 279 When running a client test against an older build, we pickle the objects 280 of this class from newer version of the class and unpicle to older 281 versions of the class. The older versions require some attributes to be 282 present. 283 """ 284 logdir = site_sysinfo.logdir(self.from_dir) 285 # base_job uses protocol 2 to pickle. We follow suit. 286 logdir_pickle = pickle.dumps(logdir, protocol=2) 287 logdir = pickle.loads(logdir_pickle) 288 289 self.assertEqual(logdir.additional_exclude, None) 290 291 def test_unpickle_handle_missing__excludes(self): 292 """Sanely handle missing _excludes attribute from pickles 293 294 This can happen when running brand new version of this class that 295 introduced this attribute from older server side code in prod. 296 """ 297 logdir = site_sysinfo.logdir(self.from_dir) 298 delattr(logdir, '_excludes') 299 # base_job uses protocol 2 to pickle. We follow suit. 300 logdir_pickle = pickle.dumps(logdir, protocol=2) 301 logdir = pickle.loads(logdir_pickle) 302 303 self.assertEqual(logdir._excludes, 304 site_sysinfo.logdir.DEFAULT_EXCLUDES) 305 306 def test_unpickle_handle_missing__excludes_default(self): 307 """Sanely handle missing _excludes attribute from pickles 308 309 This can happen when running brand new version of this class that 310 introduced this attribute from older server side code in prod. 311 """ 312 logdir = site_sysinfo.logdir( 313 self.from_dir, 314 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 315 delattr(logdir, '_excludes') 316 # base_job uses protocol 2 to pickle. We follow suit. 317 logdir_pickle = pickle.dumps(logdir, protocol=2) 318 logdir = pickle.loads(logdir_pickle) 319 320 self.assertEqual( 321 logdir._excludes, 322 (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 323 324 325if __name__ == '__main__': 326 unittest.main() 327