1import logging 2import collections 3 4from .case import _BaseTestCaseContext 5 6 7_LoggingWatcher = collections.namedtuple("_LoggingWatcher", 8 ["records", "output"]) 9 10class _CapturingHandler(logging.Handler): 11 """ 12 A logging handler capturing all (raw and formatted) logging output. 13 """ 14 15 def __init__(self): 16 logging.Handler.__init__(self) 17 self.watcher = _LoggingWatcher([], []) 18 19 def flush(self): 20 pass 21 22 def emit(self, record): 23 self.watcher.records.append(record) 24 msg = self.format(record) 25 self.watcher.output.append(msg) 26 27 28class _AssertLogsContext(_BaseTestCaseContext): 29 """A context manager used to implement TestCase.assertLogs().""" 30 31 LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s" 32 33 def __init__(self, test_case, logger_name, level): 34 _BaseTestCaseContext.__init__(self, test_case) 35 self.logger_name = logger_name 36 if level: 37 self.level = logging._nameToLevel.get(level, level) 38 else: 39 self.level = logging.INFO 40 self.msg = None 41 42 def __enter__(self): 43 if isinstance(self.logger_name, logging.Logger): 44 logger = self.logger = self.logger_name 45 else: 46 logger = self.logger = logging.getLogger(self.logger_name) 47 formatter = logging.Formatter(self.LOGGING_FORMAT) 48 handler = _CapturingHandler() 49 handler.setFormatter(formatter) 50 self.watcher = handler.watcher 51 self.old_handlers = logger.handlers[:] 52 self.old_level = logger.level 53 self.old_propagate = logger.propagate 54 logger.handlers = [handler] 55 logger.setLevel(self.level) 56 logger.propagate = False 57 return handler.watcher 58 59 def __exit__(self, exc_type, exc_value, tb): 60 self.logger.handlers = self.old_handlers 61 self.logger.propagate = self.old_propagate 62 self.logger.setLevel(self.old_level) 63 if exc_type is not None: 64 # let unexpected exceptions pass through 65 return False 66 if len(self.watcher.records) == 0: 67 self._raiseFailure( 68 "no logs of level {} or higher triggered on {}" 69 .format(logging.getLevelName(self.level), self.logger.name)) 70