• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2
3"""Tests for site_sysinfo."""
4
5from __future__ import absolute_import
6from __future__ import division
7from __future__ import print_function
8
9
10__author__ = 'dshi@google.com (Dan Shi)'
11
12import six.moves.cPickle as pickle
13import filecmp
14import os
15import random
16import shutil
17import tempfile
18import unittest
19
20import common
21from autotest_lib.client.bin import site_sysinfo
22from autotest_lib.client.common_lib import autotemp
23from six.moves import range
24from six.moves import zip
25
26
27class diffable_logdir_test(unittest.TestCase):
28    """Tests for methods in class diffable_logdir."""
29
30
31    def setUp(self):
32        """Initialize a temp direcotry with test files."""
33        self.tempdir = autotemp.tempdir(unique_id='diffable_logdir')
34        self.src_dir = os.path.join(self.tempdir.name, 'src')
35        self.dest_dir = os.path.join(self.tempdir.name, 'dest')
36
37        self.existing_files = ['existing_file_'+str(i) for i in range(3)]
38        self.existing_files_folder = ['', 'sub', 'sub/sub2']
39        self.existing_files_path = [os.path.join(self.src_dir, folder, f)
40                                    for f,folder in zip(self.existing_files,
41                                                self.existing_files_folder)]
42        self.new_files = ['new_file_'+str(i) for i in range(2)]
43        self.new_files_folder = ['sub', 'sub/sub3']
44        self.new_files_path = [os.path.join(self.src_dir, folder, f)
45                                    for f,folder in zip(self.new_files,
46                                                self.new_files_folder)]
47
48        # Create some file with random data in source directory.
49        for p in self.existing_files_path:
50            self.append_text_to_file(str(random.random()), p)
51
52        self.existing_fifo_path = os.path.join(
53            self.src_dir,'sub/sub2/existing_fifo')
54        os.mkfifo(self.existing_fifo_path)
55
56
57    def tearDown(self):
58        """Clearn up."""
59        self.tempdir.clean()
60
61
62    def append_text_to_file(self, text, file_path):
63        """Append text to the end of a file, create the file if not existed.
64
65        @param text: text to be appended to a file.
66        @param file_path: path to the file.
67
68        """
69        dir_name = os.path.dirname(file_path)
70        if not os.path.exists(dir_name):
71            os.makedirs(dir_name)
72        with open(file_path, 'a') as f:
73            f.write(text)
74
75
76    def assert_trees_equal(self, dir1, dir2, ignore=None):
77        """Assert two directory trees contain the same files.
78
79        @param dir1: the left comparison directory.
80        @param dir2: the right comparison directory.
81        @param ignore: filenames to ignore (in any directory).
82
83        """
84        dircmps = []
85        dircmps.append(filecmp.dircmp(dir1, dir2, ignore))
86        while dircmps:
87            dcmp = dircmps.pop()
88            self.assertEqual(dcmp.left_list, dcmp.right_list)
89            self.assertEqual([], dcmp.diff_files)
90            dircmps.extend(dcmp.subdirs.values())
91
92
93    def test_diffable_logdir_success(self):
94        """Test the diff function to save new data from a directory."""
95        info = site_sysinfo.diffable_logdir(self.src_dir,
96                                            keep_file_hierarchy=False,
97                                            append_diff_in_name=False)
98        # Run the first time to collect file status.
99        info.run(log_dir=None, collect_init_status=True)
100
101        # Add new files to the test directory.
102        for file_name, file_path in zip(self.new_files,
103                                         self.new_files_path):
104            self.append_text_to_file(file_name, file_path)
105
106        # Temp file for existing_file_2, used to hold on the inode. If the
107        # file is deleted and recreated, its inode might not change.
108        existing_file_2 = self.existing_files_path[2]
109        existing_file_2_tmp =  existing_file_2 + '_tmp'
110        os.rename(existing_file_2, existing_file_2_tmp)
111
112        # Append data to existing file.
113        for file_name, file_path in zip(self.existing_files,
114                                         self.existing_files_path):
115            self.append_text_to_file(file_name, file_path)
116
117        # Remove the tmp file.
118        os.remove(existing_file_2_tmp)
119
120        # Add a new FIFO
121        new_fifo_path = os.path.join(self.src_dir, 'sub/sub2/new_fifo')
122        os.mkfifo(new_fifo_path)
123
124        # Run the second time to do diff.
125        info.run(self.dest_dir, collect_init_status=False, collect_all=True)
126
127        # Validate files in dest_dir.
128        for file_name, file_path in zip(self.existing_files+self.new_files,
129                                self.existing_files_path+self.new_files_path):
130            file_path = file_path.replace('src', 'dest')
131            with open(file_path, 'r') as f:
132                self.assertEqual(file_name, f.read())
133
134        # Assert that FIFOs are not in the diff.
135        self.assertFalse(
136                os.path.exists(self.existing_fifo_path.replace('src', 'dest')),
137                msg='Existing FIFO present in diff sysinfo')
138        self.assertFalse(
139                os.path.exists(new_fifo_path.replace('src', 'dest')),
140                msg='New FIFO present in diff sysinfo')
141
142        # With collect_all=True, full sysinfo should also be present.
143        full_sysinfo_path = self.dest_dir + self.src_dir
144        self.assertTrue(os.path.exists(full_sysinfo_path),
145                        msg='Full sysinfo not present')
146
147        # Assert that the full sysinfo is present.
148        self.assertNotEqual(self.src_dir, full_sysinfo_path)
149        self.assert_trees_equal(self.src_dir, full_sysinfo_path)
150
151
152class LogdirTestCase(unittest.TestCase):
153    """Tests logdir.run"""
154
155    def setUp(self):
156        self.tempdir = tempfile.mkdtemp()
157        self.addCleanup(shutil.rmtree, self.tempdir)
158
159        self.from_dir = os.path.join(self.tempdir, 'from')
160        self.to_dir = os.path.join(self.tempdir, 'to')
161        os.mkdir(self.from_dir)
162        os.mkdir(self.to_dir)
163
164    def _destination_path(self, relative_path, from_dir=None):
165        """The expected destination path for a subdir of the source directory"""
166        if from_dir is None:
167            from_dir = self.from_dir
168        return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path))
169
170    def test_run_recreates_absolute_source_path(self):
171        """When copying files, the absolute path from the source is recreated
172        in the destination folder.
173        """
174        os.mkdir(os.path.join(self.from_dir, 'fubar'))
175        logdir = site_sysinfo.logdir(self.from_dir)
176        logdir.run(self.to_dir)
177        destination_path= self._destination_path('fubar')
178        self.assertTrue(os.path.exists(destination_path),
179                        msg='Failed to copy to %s' % destination_path)
180
181    def test_run_skips_symlinks(self):
182        os.mkdir(os.path.join(self.from_dir, 'real'))
183        os.symlink(os.path.join(self.from_dir, 'real'),
184                   os.path.join(self.from_dir, 'symlink'))
185
186        logdir = site_sysinfo.logdir(self.from_dir)
187        logdir.run(self.to_dir)
188
189        destination_path_real = self._destination_path('real')
190        self.assertTrue(os.path.exists(destination_path_real),
191                        msg='real directory was not copied to %s' %
192                        destination_path_real)
193        destination_path_link = self._destination_path('symlink')
194        self.assertFalse(
195                os.path.exists(destination_path_link),
196                msg='symlink was copied to %s' % destination_path_link)
197
198    def test_run_resolves_symlinks_to_source_root(self):
199        """run tries hard to get to the source directory before copying.
200
201        Within the source folder, we skip any symlinks, but we first try to
202        resolve symlinks to the source root itself.
203        """
204        os.mkdir(os.path.join(self.from_dir, 'fubar'))
205        from_symlink = os.path.join(self.tempdir, 'from_symlink')
206        os.symlink(self.from_dir, from_symlink)
207
208        logdir = site_sysinfo.logdir(from_symlink)
209        logdir.run(self.to_dir)
210
211        destination_path = self._destination_path('fubar')
212        self.assertTrue(os.path.exists(destination_path),
213                        msg='Failed to copy to %s' % destination_path)
214
215    def test_run_excludes_common_patterns(self):
216        os.mkdir(os.path.join(self.from_dir, 'autoserv2344'))
217        # Create empty file.
218        open(os.path.join(self.from_dir, 'system.journal'), 'w').close()
219        deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix')
220        os.makedirs(os.path.join(self.from_dir, deeper_subdir))
221
222        logdir = site_sysinfo.logdir(self.from_dir)
223        logdir.run(self.to_dir)
224
225        destination_path = self._destination_path('autoserv2344')
226        self.assertFalse(os.path.exists(destination_path),
227                         msg='Copied banned file to %s' % destination_path)
228        destination_path = self._destination_path(deeper_subdir)
229        self.assertFalse(os.path.exists(destination_path),
230                         msg='Copied banned file to %s' % destination_path)
231        destination_path = self._destination_path('system.journal')
232        self.assertFalse(os.path.exists(destination_path),
233                         msg='Copied banned file to %s' % destination_path)
234
235    def test_run_ignores_exclude_patterns_in_leading_dirs(self):
236        """Exclude patterns should only be applied to path suffixes within
237        from_dir, not to the root directory itself.
238        """
239        exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344')
240        os.makedirs(os.path.join(exclude_pattern_dir, 'fubar'))
241        logdir = site_sysinfo.logdir(exclude_pattern_dir)
242        logdir.run(self.to_dir)
243        destination_path = self._destination_path('fubar',
244                                                  from_dir=exclude_pattern_dir)
245        self.assertTrue(os.path.exists(destination_path),
246                        msg='Failed to copy to %s' % destination_path)
247
248    def test_pickle_unpickle_equal(self):
249        """Sanity check pickle-unpickle round-trip."""
250        logdir = site_sysinfo.logdir(
251                self.from_dir,
252                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
253        # base_job uses protocol 2 to pickle. We follow suit.
254        logdir_pickle = pickle.dumps(logdir, protocol=2)
255        unpickled_logdir = pickle.loads(logdir_pickle)
256
257        self.assertEqual(unpickled_logdir, logdir)
258
259    def test_pickle_enforce_required_attributes(self):
260        """Some attributes from this object should never be dropped.
261
262        When running a client test against an older build, we pickle the objects
263        of this class from newer version of the class and unpicle to older
264        versions of the class. The older versions require some attributes to be
265        present.
266        """
267        logdir = site_sysinfo.logdir(
268                self.from_dir,
269                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
270        # base_job uses protocol 2 to pickle. We follow suit.
271        logdir_pickle = pickle.dumps(logdir, protocol=2)
272        logdir = pickle.loads(logdir_pickle)
273
274        self.assertEqual(logdir.additional_exclude, 'a')
275
276    def test_pickle_enforce_required_attributes_default(self):
277        """Some attributes from this object should never be dropped.
278
279        When running a client test against an older build, we pickle the objects
280        of this class from newer version of the class and unpicle to older
281        versions of the class. The older versions require some attributes to be
282        present.
283        """
284        logdir = site_sysinfo.logdir(self.from_dir)
285        # base_job uses protocol 2 to pickle. We follow suit.
286        logdir_pickle = pickle.dumps(logdir, protocol=2)
287        logdir = pickle.loads(logdir_pickle)
288
289        self.assertEqual(logdir.additional_exclude, None)
290
291    def test_unpickle_handle_missing__excludes(self):
292        """Sanely handle missing _excludes attribute from pickles
293
294        This can happen when running brand new version of this class that
295        introduced this attribute from older server side code in prod.
296        """
297        logdir = site_sysinfo.logdir(self.from_dir)
298        delattr(logdir, '_excludes')
299        # base_job uses protocol 2 to pickle. We follow suit.
300        logdir_pickle = pickle.dumps(logdir, protocol=2)
301        logdir = pickle.loads(logdir_pickle)
302
303        self.assertEqual(logdir._excludes,
304                         site_sysinfo.logdir.DEFAULT_EXCLUDES)
305
306    def test_unpickle_handle_missing__excludes_default(self):
307        """Sanely handle missing _excludes attribute from pickles
308
309        This can happen when running brand new version of this class that
310        introduced this attribute from older server side code in prod.
311        """
312        logdir = site_sysinfo.logdir(
313                self.from_dir,
314                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
315        delattr(logdir, '_excludes')
316        # base_job uses protocol 2 to pickle. We follow suit.
317        logdir_pickle = pickle.dumps(logdir, protocol=2)
318        logdir = pickle.loads(logdir_pickle)
319
320        self.assertEqual(
321                logdir._excludes,
322                (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
323
324
325if __name__ == '__main__':
326    unittest.main()
327