• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2
3import logging, os, select, StringIO, subprocess, sys, unittest
4import common
5from autotest_lib.client.common_lib import logging_manager, logging_config
6
7
8class PipedStringIO(object):
9    """
10    Like StringIO, but all I/O passes through a pipe.  This means a
11    PipedStringIO is backed by a file descriptor is thus can do things like
12    pass down to a subprocess.  However, this means the creating process must
13    call read_pipe() (or the classmethod read_all_pipes()) periodically to read
14    the pipe, and must call close() (or the classmethod cleanup()) to close the
15    pipe.
16    """
17    _instances = set()
18
19    def __init__(self):
20        self._string_io = StringIO.StringIO()
21        self._read_end, self._write_end = os.pipe()
22        PipedStringIO._instances.add(self)
23
24
25    def close(self):
26        self._string_io.close()
27        os.close(self._read_end)
28        os.close(self._write_end)
29        PipedStringIO._instances.remove(self)
30
31
32    def write(self, data):
33        os.write(self._write_end, data)
34
35
36    def flush(self):
37        pass
38
39
40    def fileno(self):
41        return self._write_end
42
43
44    def getvalue(self):
45        self.read_pipe()
46        return self._string_io.getvalue()
47
48
49    def read_pipe(self):
50        while True:
51            read_list, _, _ = select.select([self._read_end], [], [], 0)
52            if not read_list:
53                return
54            self._string_io.write(os.read(self._read_end, 1024))
55
56
57    @classmethod
58    def read_all_pipes(cls):
59        for instance in cls._instances:
60            instance.read_pipe()
61
62
63    @classmethod
64    def cleanup_all_instances(cls):
65        for instance in list(cls._instances):
66            instance.close()
67
68
69LOGGING_FORMAT = '%(levelname)s: %(message)s'
70
71_EXPECTED_STDOUT = """\
72print 1
73system 1
74INFO: logging 1
75INFO: print 2
76INFO: system 2
77INFO: logging 2
78INFO: print 6
79INFO: system 6
80INFO: logging 6
81print 7
82system 7
83INFO: logging 7
84"""
85
86_EXPECTED_LOG1 = """\
87INFO: print 3
88INFO: system 3
89INFO: logging 3
90INFO: print 4
91INFO: system 4
92INFO: logging 4
93INFO: print 5
94INFO: system 5
95INFO: logging 5
96"""
97
98_EXPECTED_LOG2 = """\
99INFO: print 4
100INFO: system 4
101INFO: logging 4
102"""
103
104
105class DummyLoggingConfig(logging_config.LoggingConfig):
106    console_formatter = logging.Formatter(LOGGING_FORMAT)
107
108    def __init__(self):
109        super(DummyLoggingConfig, self).__init__()
110        self.log = PipedStringIO()
111
112
113    def add_debug_file_handlers(self, log_dir, log_name=None):
114        self.logger.addHandler(logging.StreamHandler(self.log))
115
116
117# this isn't really a unit test since it creates subprocesses and pipes and all
118# that. i just use the unittest framework because it's convenient.
119class LoggingManagerTest(unittest.TestCase):
120    def setUp(self):
121        self.stdout = PipedStringIO()
122        self._log1 = PipedStringIO()
123        self._log2 = PipedStringIO()
124
125        self._real_system_calls = False
126
127        # the LoggingManager will change self.stdout (by design), so keep a
128        # copy around
129        self._original_stdout = self.stdout
130
131        # clear out import-time logging config and reconfigure
132        root_logger = logging.getLogger()
133        for handler in list(root_logger.handlers):
134            root_logger.removeHandler(handler)
135        # use INFO to ignore debug output from logging_manager itself
136        logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT,
137                            stream=self.stdout)
138
139        self._config_object = DummyLoggingConfig()
140        logging_manager.LoggingManager.logging_config_object = (
141                self._config_object)
142
143
144    def tearDown(self):
145        PipedStringIO.cleanup_all_instances()
146
147
148    def _say(self, suffix):
149        print >>self.stdout, 'print %s' % suffix
150        if self._real_system_calls:
151            os.system('echo system %s >&%s' % (suffix,
152                                               self._original_stdout.fileno()))
153        else:
154            print >>self.stdout, 'system %s' % suffix
155        logging.info('logging %s', suffix)
156        PipedStringIO.read_all_pipes()
157
158
159    def _setup_manager(self, manager_class=logging_manager.LoggingManager):
160        def set_stdout(file_object):
161            self.stdout = file_object
162        manager = manager_class()
163        manager.manage_stream(self.stdout, logging.INFO, set_stdout)
164        return manager
165
166
167    def _run_test(self, manager_class):
168        manager = self._setup_manager(manager_class)
169
170        self._say(1)
171
172        manager.start_logging()
173        self._say(2)
174
175        manager.redirect_to_stream(self._log1)
176        self._say(3)
177
178        manager.tee_redirect_to_stream(self._log2)
179        self._say(4)
180
181        manager.undo_redirect()
182        self._say(5)
183
184        manager.undo_redirect()
185        self._say(6)
186
187        manager.stop_logging()
188        self._say(7)
189
190
191    def _grab_fd_info(self):
192        command = 'ls -l /proc/%s/fd' % os.getpid()
193        proc = subprocess.Popen(command.split(), shell=True,
194                                stdout=subprocess.PIPE)
195        return proc.communicate()[0]
196
197
198    def _compare_logs(self, log_buffer, expected_text):
199        actual_lines = log_buffer.getvalue().splitlines()
200        expected_lines = expected_text.splitlines()
201        if self._real_system_calls:
202            # because of the many interacting processes, we can't ensure perfect
203            # interleaving.  so compare sets of lines rather than ordered lines.
204            actual_lines = set(actual_lines)
205            expected_lines = set(expected_lines)
206        self.assertEquals(actual_lines, expected_lines)
207
208
209    def _check_results(self):
210        # ensure our stdout was restored
211        self.assertEquals(self.stdout, self._original_stdout)
212
213        if self._real_system_calls:
214            # ensure FDs were left in their original state
215            self.assertEquals(self._grab_fd_info(), self._fd_info)
216
217        self._compare_logs(self.stdout, _EXPECTED_STDOUT)
218        self._compare_logs(self._log1, _EXPECTED_LOG1)
219        self._compare_logs(self._log2, _EXPECTED_LOG2)
220
221
222    def test_logging_manager(self):
223        self._run_test(logging_manager.LoggingManager)
224        self._check_results()
225
226
227    def test_fd_redirection_logging_manager(self):
228        self._real_system_calls = True
229        self._fd_info = self._grab_fd_info()
230        self._run_test(logging_manager.FdRedirectionLoggingManager)
231        self._check_results()
232
233
234    def test_tee_redirect_debug_dir(self):
235        manager = self._setup_manager()
236        manager.start_logging()
237
238        manager.tee_redirect_debug_dir('/fake/dir', tag='mytag')
239        print >>self.stdout, 'hello'
240
241        manager.undo_redirect()
242        print >>self.stdout, 'goodbye'
243
244        manager.stop_logging()
245
246        self._compare_logs(self.stdout,
247                           'INFO: mytag : hello\nINFO: goodbye')
248        self._compare_logs(self._config_object.log, 'hello\n')
249
250
251class MonkeyPatchTestCase(unittest.TestCase):
252    def setUp(self):
253        filename = os.path.split(__file__)[1]
254        if filename.endswith('.pyc'):
255            filename = filename[:-1]
256        self.expected_filename = filename
257
258
259    def check_filename(self, filename, expected=None):
260        if expected is None:
261            expected = [self.expected_filename]
262        self.assertIn(os.path.split(filename)[1], expected)
263
264
265    def _0_test_find_caller(self):
266        finder = logging_manager._logging_manager_aware_logger__find_caller
267        filename, lineno, caller_name = finder(logging_manager.logger)
268        self.check_filename(filename)
269        self.assertEquals('test_find_caller', caller_name)
270
271
272    def _1_test_find_caller(self):
273        self._0_test_find_caller()
274
275
276    def test_find_caller(self):
277        self._1_test_find_caller()
278
279
280    def _0_test_non_reported_find_caller(self):
281        finder = logging_manager._logging_manager_aware_logger__find_caller
282        filename, lineno, caller_name = finder(logging_manager.logger)
283        # Python 2.4 unittest implementation will call the unittest method in
284        # file 'unittest.py', and Python >= 2.6 does the same in 'case.py'
285        self.check_filename(filename, expected=['unittest.py', 'case.py'])
286
287
288    def _1_test_non_reported_find_caller(self):
289        self._0_test_non_reported_find_caller()
290
291
292    @logging_manager.do_not_report_as_logging_caller
293    def test_non_reported_find_caller(self):
294        self._1_test_non_reported_find_caller()
295
296
297
298if __name__ == '__main__':
299    unittest.main()
300