1 import logging
2 import collections
3
4 from .case import _BaseTestCaseContext
5
6
7 _LoggingWatcher = collections.namedtuple("_LoggingWatcher",
8 ["records", "output"])
9
10 class ESC[4;38;5;81m_CapturingHandler(ESC[4;38;5;149mloggingESC[4;38;5;149m.ESC[4;38;5;149mHandler):
11 """
12 A logging handler capturing all (raw and formatted) logging output.
13 """
14
15 def __init__(self):
16 logging.Handler.__init__(self)
17 self.watcher = _LoggingWatcher([], [])
18
19 def flush(self):
20 pass
21
22 def emit(self, record):
23 self.watcher.records.append(record)
24 msg = self.format(record)
25 self.watcher.output.append(msg)
26
27
28 class ESC[4;38;5;81m_AssertLogsContext(ESC[4;38;5;149m_BaseTestCaseContext):
29 """A context manager for assertLogs() and assertNoLogs() """
30
31 LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"
32
33 def __init__(self, test_case, logger_name, level, no_logs):
34 _BaseTestCaseContext.__init__(self, test_case)
35 self.logger_name = logger_name
36 if level:
37 self.level = logging._nameToLevel.get(level, level)
38 else:
39 self.level = logging.INFO
40 self.msg = None
41 self.no_logs = no_logs
42
43 def __enter__(self):
44 if isinstance(self.logger_name, logging.Logger):
45 logger = self.logger = self.logger_name
46 else:
47 logger = self.logger = logging.getLogger(self.logger_name)
48 formatter = logging.Formatter(self.LOGGING_FORMAT)
49 handler = _CapturingHandler()
50 handler.setLevel(self.level)
51 handler.setFormatter(formatter)
52 self.watcher = handler.watcher
53 self.old_handlers = logger.handlers[:]
54 self.old_level = logger.level
55 self.old_propagate = logger.propagate
56 logger.handlers = [handler]
57 logger.setLevel(self.level)
58 logger.propagate = False
59 if self.no_logs:
60 return
61 return handler.watcher
62
63 def __exit__(self, exc_type, exc_value, tb):
64 self.logger.handlers = self.old_handlers
65 self.logger.propagate = self.old_propagate
66 self.logger.setLevel(self.old_level)
67
68 if exc_type is not None:
69 # let unexpected exceptions pass through
70 return False
71
72 if self.no_logs:
73 # assertNoLogs
74 if len(self.watcher.records) > 0:
75 self._raiseFailure(
76 "Unexpected logs found: {!r}".format(
77 self.watcher.output
78 )
79 )
80
81 else:
82 # assertLogs
83 if len(self.watcher.records) == 0:
84 self._raiseFailure(
85 "no logs of level {} or higher triggered on {}"
86 .format(logging.getLevelName(self.level), self.logger.name))