• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2
3"""Tests for site_sysinfo."""
4
5__author__ = 'dshi@google.com (Dan Shi)'
6
7import cPickle as pickle
8import filecmp
9import os
10import random
11import shutil
12import tempfile
13import unittest
14
15import common
16from autotest_lib.client.bin import site_sysinfo
17from autotest_lib.client.common_lib import autotemp
18
19
20class diffable_logdir_test(unittest.TestCase):
21    """Tests for methods in class diffable_logdir."""
22
23
24    def setUp(self):
25        """Initialize a temp direcotry with test files."""
26        self.tempdir = autotemp.tempdir(unique_id='diffable_logdir')
27        self.src_dir = os.path.join(self.tempdir.name, 'src')
28        self.dest_dir = os.path.join(self.tempdir.name, 'dest')
29
30        self.existing_files = ['existing_file_'+str(i) for i in range(3)]
31        self.existing_files_folder = ['', 'sub', 'sub/sub2']
32        self.existing_files_path = [os.path.join(self.src_dir, folder, f)
33                                    for f,folder in zip(self.existing_files,
34                                                self.existing_files_folder)]
35        self.new_files = ['new_file_'+str(i) for i in range(2)]
36        self.new_files_folder = ['sub', 'sub/sub3']
37        self.new_files_path = [os.path.join(self.src_dir, folder, f)
38                                    for f,folder in zip(self.new_files,
39                                                self.new_files_folder)]
40
41        # Create some file with random data in source directory.
42        for p in self.existing_files_path:
43            self.append_text_to_file(str(random.random()), p)
44
45        self.existing_fifo_path = os.path.join(
46            self.src_dir,'sub/sub2/existing_fifo')
47        os.mkfifo(self.existing_fifo_path)
48
49
50    def tearDown(self):
51        """Clearn up."""
52        self.tempdir.clean()
53
54
55    def append_text_to_file(self, text, file_path):
56        """Append text to the end of a file, create the file if not existed.
57
58        @param text: text to be appended to a file.
59        @param file_path: path to the file.
60
61        """
62        dir_name = os.path.dirname(file_path)
63        if not os.path.exists(dir_name):
64            os.makedirs(dir_name)
65        with open(file_path, 'a') as f:
66            f.write(text)
67
68
69    def assert_trees_equal(self, dir1, dir2, ignore=None):
70        """Assert two directory trees contain the same files.
71
72        @param dir1: the left comparison directory.
73        @param dir2: the right comparison directory.
74        @param ignore: filenames to ignore (in any directory).
75
76        """
77        dircmps = []
78        dircmps.append(filecmp.dircmp(dir1, dir2, ignore))
79        while dircmps:
80            dcmp = dircmps.pop()
81            self.assertEqual(dcmp.left_list, dcmp.right_list)
82            self.assertEqual([], dcmp.diff_files)
83            dircmps.extend(dcmp.subdirs.values())
84
85
86    def test_diffable_logdir_success(self):
87        """Test the diff function to save new data from a directory."""
88        info = site_sysinfo.diffable_logdir(self.src_dir,
89                                            keep_file_hierarchy=False,
90                                            append_diff_in_name=False)
91        # Run the first time to collect file status.
92        info.run(log_dir=None, collect_init_status=True)
93
94        # Add new files to the test directory.
95        for file_name, file_path in zip(self.new_files,
96                                         self.new_files_path):
97            self.append_text_to_file(file_name, file_path)
98
99        # Temp file for existing_file_2, used to hold on the inode. If the
100        # file is deleted and recreated, its inode might not change.
101        existing_file_2 = self.existing_files_path[2]
102        existing_file_2_tmp =  existing_file_2 + '_tmp'
103        os.rename(existing_file_2, existing_file_2_tmp)
104
105        # Append data to existing file.
106        for file_name, file_path in zip(self.existing_files,
107                                         self.existing_files_path):
108            self.append_text_to_file(file_name, file_path)
109
110        # Remove the tmp file.
111        os.remove(existing_file_2_tmp)
112
113        # Add a new FIFO
114        new_fifo_path = os.path.join(self.src_dir, 'sub/sub2/new_fifo')
115        os.mkfifo(new_fifo_path)
116
117        # Run the second time to do diff.
118        info.run(self.dest_dir, collect_init_status=False, collect_all=True)
119
120        # Validate files in dest_dir.
121        for file_name, file_path in zip(self.existing_files+self.new_files,
122                                self.existing_files_path+self.new_files_path):
123            file_path = file_path.replace('src', 'dest')
124            with open(file_path, 'r') as f:
125                self.assertEqual(file_name, f.read())
126
127        # Assert that FIFOs are not in the diff.
128        self.assertFalse(
129                os.path.exists(self.existing_fifo_path.replace('src', 'dest')),
130                msg='Existing FIFO present in diff sysinfo')
131        self.assertFalse(
132                os.path.exists(new_fifo_path.replace('src', 'dest')),
133                msg='New FIFO present in diff sysinfo')
134
135        # With collect_all=True, full sysinfo should also be present.
136        full_sysinfo_path = self.dest_dir + self.src_dir
137        self.assertTrue(os.path.exists(full_sysinfo_path),
138                        msg='Full sysinfo not present')
139
140        # Assert that the full sysinfo is present.
141        self.assertNotEqual(self.src_dir, full_sysinfo_path)
142        self.assert_trees_equal(self.src_dir, full_sysinfo_path)
143
144
145class LogdirTestCase(unittest.TestCase):
146    """Tests logdir.run"""
147
148    def setUp(self):
149        self.tempdir = tempfile.mkdtemp()
150        self.addCleanup(shutil.rmtree, self.tempdir)
151
152        self.from_dir = os.path.join(self.tempdir, 'from')
153        self.to_dir = os.path.join(self.tempdir, 'to')
154        os.mkdir(self.from_dir)
155        os.mkdir(self.to_dir)
156
157    def _destination_path(self, relative_path, from_dir=None):
158        """The expected destination path for a subdir of the source directory"""
159        if from_dir is None:
160            from_dir = self.from_dir
161        return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path))
162
163    def test_run_recreates_absolute_source_path(self):
164        """When copying files, the absolute path from the source is recreated
165        in the destination folder.
166        """
167        os.mkdir(os.path.join(self.from_dir, 'fubar'))
168        logdir = site_sysinfo.logdir(self.from_dir)
169        logdir.run(self.to_dir)
170        destination_path= self._destination_path('fubar')
171        self.assertTrue(os.path.exists(destination_path),
172                        msg='Failed to copy to %s' % destination_path)
173
174    def test_run_skips_symlinks(self):
175        os.mkdir(os.path.join(self.from_dir, 'real'))
176        os.symlink(os.path.join(self.from_dir, 'real'),
177                   os.path.join(self.from_dir, 'symlink'))
178
179        logdir = site_sysinfo.logdir(self.from_dir)
180        logdir.run(self.to_dir)
181
182        destination_path_real = self._destination_path('real')
183        self.assertTrue(os.path.exists(destination_path_real),
184                        msg='real directory was not copied to %s' %
185                        destination_path_real)
186        destination_path_link = self._destination_path('symlink')
187        self.assertFalse(
188                os.path.exists(destination_path_link),
189                msg='symlink was copied to %s' % destination_path_link)
190
191    def test_run_resolves_symlinks_to_source_root(self):
192        """run tries hard to get to the source directory before copying.
193
194        Within the source folder, we skip any symlinks, but we first try to
195        resolve symlinks to the source root itself.
196        """
197        os.mkdir(os.path.join(self.from_dir, 'fubar'))
198        from_symlink = os.path.join(self.tempdir, 'from_symlink')
199        os.symlink(self.from_dir, from_symlink)
200
201        logdir = site_sysinfo.logdir(from_symlink)
202        logdir.run(self.to_dir)
203
204        destination_path = self._destination_path('fubar')
205        self.assertTrue(os.path.exists(destination_path),
206                        msg='Failed to copy to %s' % destination_path)
207
208    def test_run_excludes_common_patterns(self):
209        os.mkdir(os.path.join(self.from_dir, 'autoserv2344'))
210        # Create empty file.
211        open(os.path.join(self.from_dir, 'system.journal'), 'w').close()
212        deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix')
213        os.makedirs(os.path.join(self.from_dir, deeper_subdir))
214
215        logdir = site_sysinfo.logdir(self.from_dir)
216        logdir.run(self.to_dir)
217
218        destination_path = self._destination_path('autoserv2344')
219        self.assertFalse(os.path.exists(destination_path),
220                         msg='Copied banned file to %s' % destination_path)
221        destination_path = self._destination_path(deeper_subdir)
222        self.assertFalse(os.path.exists(destination_path),
223                         msg='Copied banned file to %s' % destination_path)
224        destination_path = self._destination_path('system.journal')
225        self.assertFalse(os.path.exists(destination_path),
226                         msg='Copied banned file to %s' % destination_path)
227
228    def test_run_ignores_exclude_patterns_in_leading_dirs(self):
229        """Exclude patterns should only be applied to path suffixes within
230        from_dir, not to the root directory itself.
231        """
232        exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344')
233        os.makedirs(os.path.join(exclude_pattern_dir, 'fubar'))
234        logdir = site_sysinfo.logdir(exclude_pattern_dir)
235        logdir.run(self.to_dir)
236        destination_path = self._destination_path('fubar',
237                                                  from_dir=exclude_pattern_dir)
238        self.assertTrue(os.path.exists(destination_path),
239                        msg='Failed to copy to %s' % destination_path)
240
241    def test_pickle_unpickle_equal(self):
242        """Sanity check pickle-unpickle round-trip."""
243        logdir = site_sysinfo.logdir(
244                self.from_dir,
245                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
246        # base_job uses protocol 2 to pickle. We follow suit.
247        logdir_pickle = pickle.dumps(logdir, protocol=2)
248        unpickled_logdir = pickle.loads(logdir_pickle)
249
250        self.assertEqual(unpickled_logdir, logdir)
251
252    def test_pickle_enforce_required_attributes(self):
253        """Some attributes from this object should never be dropped.
254
255        When running a client test against an older build, we pickle the objects
256        of this class from newer version of the class and unpicle to older
257        versions of the class. The older versions require some attributes to be
258        present.
259        """
260        logdir = site_sysinfo.logdir(
261                self.from_dir,
262                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
263        # base_job uses protocol 2 to pickle. We follow suit.
264        logdir_pickle = pickle.dumps(logdir, protocol=2)
265        logdir = pickle.loads(logdir_pickle)
266
267        self.assertEqual(logdir.additional_exclude, 'a')
268
269    def test_pickle_enforce_required_attributes_default(self):
270        """Some attributes from this object should never be dropped.
271
272        When running a client test against an older build, we pickle the objects
273        of this class from newer version of the class and unpicle to older
274        versions of the class. The older versions require some attributes to be
275        present.
276        """
277        logdir = site_sysinfo.logdir(self.from_dir)
278        # base_job uses protocol 2 to pickle. We follow suit.
279        logdir_pickle = pickle.dumps(logdir, protocol=2)
280        logdir = pickle.loads(logdir_pickle)
281
282        self.assertEqual(logdir.additional_exclude, None)
283
284    def test_unpickle_handle_missing__excludes(self):
285        """Sanely handle missing _excludes attribute from pickles
286
287        This can happen when running brand new version of this class that
288        introduced this attribute from older server side code in prod.
289        """
290        logdir = site_sysinfo.logdir(self.from_dir)
291        delattr(logdir, '_excludes')
292        # base_job uses protocol 2 to pickle. We follow suit.
293        logdir_pickle = pickle.dumps(logdir, protocol=2)
294        logdir = pickle.loads(logdir_pickle)
295
296        self.assertEqual(logdir._excludes,
297                         site_sysinfo.logdir.DEFAULT_EXCLUDES)
298
299    def test_unpickle_handle_missing__excludes_default(self):
300        """Sanely handle missing _excludes attribute from pickles
301
302        This can happen when running brand new version of this class that
303        introduced this attribute from older server side code in prod.
304        """
305        logdir = site_sysinfo.logdir(
306                self.from_dir,
307                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
308        delattr(logdir, '_excludes')
309        # base_job uses protocol 2 to pickle. We follow suit.
310        logdir_pickle = pickle.dumps(logdir, protocol=2)
311        logdir = pickle.loads(logdir_pickle)
312
313        self.assertEqual(
314                logdir._excludes,
315                (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
316
317
318if __name__ == '__main__':
319    unittest.main()
320