1import logging 2import collections 3 4from .case import _BaseTestCaseContext 5 6 7_LoggingWatcher = collections.namedtuple("_LoggingWatcher", 8 ["records", "output"]) 9 10class _CapturingHandler(logging.Handler): 11 """ 12 A logging handler capturing all (raw and formatted) logging output. 13 """ 14 15 def __init__(self): 16 logging.Handler.__init__(self) 17 self.watcher = _LoggingWatcher([], []) 18 19 def flush(self): 20 pass 21 22 def emit(self, record): 23 self.watcher.records.append(record) 24 msg = self.format(record) 25 self.watcher.output.append(msg) 26 27 28class _AssertLogsContext(_BaseTestCaseContext): 29 """A context manager for assertLogs() and assertNoLogs() """ 30 31 LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s" 32 33 def __init__(self, test_case, logger_name, level, no_logs): 34 _BaseTestCaseContext.__init__(self, test_case) 35 self.logger_name = logger_name 36 if level: 37 self.level = logging._nameToLevel.get(level, level) 38 else: 39 self.level = logging.INFO 40 self.msg = None 41 self.no_logs = no_logs 42 43 def __enter__(self): 44 if isinstance(self.logger_name, logging.Logger): 45 logger = self.logger = self.logger_name 46 else: 47 logger = self.logger = logging.getLogger(self.logger_name) 48 formatter = logging.Formatter(self.LOGGING_FORMAT) 49 handler = _CapturingHandler() 50 handler.setLevel(self.level) 51 handler.setFormatter(formatter) 52 self.watcher = handler.watcher 53 self.old_handlers = logger.handlers[:] 54 self.old_level = logger.level 55 self.old_propagate = logger.propagate 56 logger.handlers = [handler] 57 logger.setLevel(self.level) 58 logger.propagate = False 59 if self.no_logs: 60 return 61 return handler.watcher 62 63 def __exit__(self, exc_type, exc_value, tb): 64 self.logger.handlers = self.old_handlers 65 self.logger.propagate = self.old_propagate 66 self.logger.setLevel(self.old_level) 67 68 if exc_type is not None: 69 # let unexpected exceptions pass through 70 return False 71 72 if self.no_logs: 73 # assertNoLogs 74 if len(self.watcher.records) > 0: 75 self._raiseFailure( 76 "Unexpected logs found: {!r}".format( 77 self.watcher.output 78 ) 79 ) 80 81 else: 82 # assertLogs 83 if len(self.watcher.records) == 0: 84 self._raiseFailure( 85 "no logs of level {} or higher triggered on {}" 86 .format(logging.getLevelName(self.level), self.logger.name)) 87