• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import logging
2import collections
3
4from .case import _BaseTestCaseContext
5
6
7_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
8                                         ["records", "output"])
9
10class _CapturingHandler(logging.Handler):
11    """
12    A logging handler capturing all (raw and formatted) logging output.
13    """
14
15    def __init__(self):
16        logging.Handler.__init__(self)
17        self.watcher = _LoggingWatcher([], [])
18
19    def flush(self):
20        pass
21
22    def emit(self, record):
23        self.watcher.records.append(record)
24        msg = self.format(record)
25        self.watcher.output.append(msg)
26
27
28class _AssertLogsContext(_BaseTestCaseContext):
29    """A context manager for assertLogs() and assertNoLogs() """
30
31    LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"
32
33    def __init__(self, test_case, logger_name, level, no_logs):
34        _BaseTestCaseContext.__init__(self, test_case)
35        self.logger_name = logger_name
36        if level:
37            self.level = logging._nameToLevel.get(level, level)
38        else:
39            self.level = logging.INFO
40        self.msg = None
41        self.no_logs = no_logs
42
43    def __enter__(self):
44        if isinstance(self.logger_name, logging.Logger):
45            logger = self.logger = self.logger_name
46        else:
47            logger = self.logger = logging.getLogger(self.logger_name)
48        formatter = logging.Formatter(self.LOGGING_FORMAT)
49        handler = _CapturingHandler()
50        handler.setLevel(self.level)
51        handler.setFormatter(formatter)
52        self.watcher = handler.watcher
53        self.old_handlers = logger.handlers[:]
54        self.old_level = logger.level
55        self.old_propagate = logger.propagate
56        logger.handlers = [handler]
57        logger.setLevel(self.level)
58        logger.propagate = False
59        if self.no_logs:
60            return
61        return handler.watcher
62
63    def __exit__(self, exc_type, exc_value, tb):
64        self.logger.handlers = self.old_handlers
65        self.logger.propagate = self.old_propagate
66        self.logger.setLevel(self.old_level)
67
68        if exc_type is not None:
69            # let unexpected exceptions pass through
70            return False
71
72        if self.no_logs:
73            # assertNoLogs
74            if len(self.watcher.records) > 0:
75                self._raiseFailure(
76                    "Unexpected logs found: {!r}".format(
77                        self.watcher.output
78                    )
79                )
80
81        else:
82            # assertLogs
83            if len(self.watcher.records) == 0:
84                self._raiseFailure(
85                    "no logs of level {} or higher triggered on {}"
86                    .format(logging.getLevelName(self.level), self.logger.name))
87