• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Python part of the warnings subsystem."""
2
3import sys
4
5
6__all__ = ["warn", "warn_explicit", "showwarning",
7           "formatwarning", "filterwarnings", "simplefilter",
8           "resetwarnings", "catch_warnings"]
9
10def showwarning(message, category, filename, lineno, file=None, line=None):
11    """Hook to write a warning to a file; replace if you like."""
12    msg = WarningMessage(message, category, filename, lineno, file, line)
13    _showwarnmsg_impl(msg)
14
15def formatwarning(message, category, filename, lineno, line=None):
16    """Function to format a warning the standard way."""
17    msg = WarningMessage(message, category, filename, lineno, None, line)
18    return _formatwarnmsg_impl(msg)
19
20def _showwarnmsg_impl(msg):
21    file = msg.file
22    if file is None:
23        file = sys.stderr
24        if file is None:
25            # sys.stderr is None when run with pythonw.exe:
26            # warnings get lost
27            return
28    text = _formatwarnmsg(msg)
29    try:
30        file.write(text)
31    except OSError:
32        # the file (probably stderr) is invalid - this warning gets lost.
33        pass
34
35def _formatwarnmsg_impl(msg):
36    category = msg.category.__name__
37    s =  f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
38
39    if msg.line is None:
40        try:
41            import linecache
42            line = linecache.getline(msg.filename, msg.lineno)
43        except Exception:
44            # When a warning is logged during Python shutdown, linecache
45            # and the import machinery don't work anymore
46            line = None
47            linecache = None
48    else:
49        line = msg.line
50    if line:
51        line = line.strip()
52        s += "  %s\n" % line
53
54    if msg.source is not None:
55        try:
56            import tracemalloc
57        # Logging a warning should not raise a new exception:
58        # catch Exception, not only ImportError and RecursionError.
59        except Exception:
60            # don't suggest to enable tracemalloc if it's not available
61            tracing = True
62            tb = None
63        else:
64            tracing = tracemalloc.is_tracing()
65            try:
66                tb = tracemalloc.get_object_traceback(msg.source)
67            except Exception:
68                # When a warning is logged during Python shutdown, tracemalloc
69                # and the import machinery don't work anymore
70                tb = None
71
72        if tb is not None:
73            s += 'Object allocated at (most recent call last):\n'
74            for frame in tb:
75                s += ('  File "%s", lineno %s\n'
76                      % (frame.filename, frame.lineno))
77
78                try:
79                    if linecache is not None:
80                        line = linecache.getline(frame.filename, frame.lineno)
81                    else:
82                        line = None
83                except Exception:
84                    line = None
85                if line:
86                    line = line.strip()
87                    s += '    %s\n' % line
88        elif not tracing:
89            s += (f'{category}: Enable tracemalloc to get the object '
90                  f'allocation traceback\n')
91    return s
92
93# Keep a reference to check if the function was replaced
94_showwarning_orig = showwarning
95
96def _showwarnmsg(msg):
97    """Hook to write a warning to a file; replace if you like."""
98    try:
99        sw = showwarning
100    except NameError:
101        pass
102    else:
103        if sw is not _showwarning_orig:
104            # warnings.showwarning() was replaced
105            if not callable(sw):
106                raise TypeError("warnings.showwarning() must be set to a "
107                                "function or method")
108
109            sw(msg.message, msg.category, msg.filename, msg.lineno,
110               msg.file, msg.line)
111            return
112    _showwarnmsg_impl(msg)
113
114# Keep a reference to check if the function was replaced
115_formatwarning_orig = formatwarning
116
117def _formatwarnmsg(msg):
118    """Function to format a warning the standard way."""
119    try:
120        fw = formatwarning
121    except NameError:
122        pass
123    else:
124        if fw is not _formatwarning_orig:
125            # warnings.formatwarning() was replaced
126            return fw(msg.message, msg.category,
127                      msg.filename, msg.lineno, msg.line)
128    return _formatwarnmsg_impl(msg)
129
130def filterwarnings(action, message="", category=Warning, module="", lineno=0,
131                   append=False):
132    """Insert an entry into the list of warnings filters (at the front).
133
134    'action' -- one of "error", "ignore", "always", "default", "module",
135                or "once"
136    'message' -- a regex that the warning message must match
137    'category' -- a class that the warning must be a subclass of
138    'module' -- a regex that the module name must match
139    'lineno' -- an integer line number, 0 matches all warnings
140    'append' -- if true, append to the list of filters
141    """
142    assert action in ("error", "ignore", "always", "default", "module",
143                      "once"), "invalid action: %r" % (action,)
144    assert isinstance(message, str), "message must be a string"
145    assert isinstance(category, type), "category must be a class"
146    assert issubclass(category, Warning), "category must be a Warning subclass"
147    assert isinstance(module, str), "module must be a string"
148    assert isinstance(lineno, int) and lineno >= 0, \
149           "lineno must be an int >= 0"
150
151    if message or module:
152        import re
153
154    if message:
155        message = re.compile(message, re.I)
156    else:
157        message = None
158    if module:
159        module = re.compile(module)
160    else:
161        module = None
162
163    _add_filter(action, message, category, module, lineno, append=append)
164
165def simplefilter(action, category=Warning, lineno=0, append=False):
166    """Insert a simple entry into the list of warnings filters (at the front).
167
168    A simple filter matches all modules and messages.
169    'action' -- one of "error", "ignore", "always", "default", "module",
170                or "once"
171    'category' -- a class that the warning must be a subclass of
172    'lineno' -- an integer line number, 0 matches all warnings
173    'append' -- if true, append to the list of filters
174    """
175    assert action in ("error", "ignore", "always", "default", "module",
176                      "once"), "invalid action: %r" % (action,)
177    assert isinstance(lineno, int) and lineno >= 0, \
178           "lineno must be an int >= 0"
179    _add_filter(action, None, category, None, lineno, append=append)
180
181def _add_filter(*item, append):
182    # Remove possible duplicate filters, so new one will be placed
183    # in correct place. If append=True and duplicate exists, do nothing.
184    if not append:
185        try:
186            filters.remove(item)
187        except ValueError:
188            pass
189        filters.insert(0, item)
190    else:
191        if item not in filters:
192            filters.append(item)
193    _filters_mutated()
194
195def resetwarnings():
196    """Clear the list of warning filters, so that no filters are active."""
197    filters[:] = []
198    _filters_mutated()
199
200class _OptionError(Exception):
201    """Exception used by option processing helpers."""
202    pass
203
204# Helper to process -W options passed via sys.warnoptions
205def _processoptions(args):
206    for arg in args:
207        try:
208            _setoption(arg)
209        except _OptionError as msg:
210            print("Invalid -W option ignored:", msg, file=sys.stderr)
211
212# Helper for _processoptions()
213def _setoption(arg):
214    parts = arg.split(':')
215    if len(parts) > 5:
216        raise _OptionError("too many fields (max 5): %r" % (arg,))
217    while len(parts) < 5:
218        parts.append('')
219    action, message, category, module, lineno = [s.strip()
220                                                 for s in parts]
221    action = _getaction(action)
222    category = _getcategory(category)
223    if message or module:
224        import re
225    if message:
226        message = re.escape(message)
227    if module:
228        module = re.escape(module) + r'\Z'
229    if lineno:
230        try:
231            lineno = int(lineno)
232            if lineno < 0:
233                raise ValueError
234        except (ValueError, OverflowError):
235            raise _OptionError("invalid lineno %r" % (lineno,)) from None
236    else:
237        lineno = 0
238    filterwarnings(action, message, category, module, lineno)
239
240# Helper for _setoption()
241def _getaction(action):
242    if not action:
243        return "default"
244    if action == "all": return "always" # Alias
245    for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
246        if a.startswith(action):
247            return a
248    raise _OptionError("invalid action: %r" % (action,))
249
250# Helper for _setoption()
251def _getcategory(category):
252    if not category:
253        return Warning
254    if '.' not in category:
255        import builtins as m
256        klass = category
257    else:
258        module, _, klass = category.rpartition('.')
259        try:
260            m = __import__(module, None, None, [klass])
261        except ImportError:
262            raise _OptionError("invalid module name: %r" % (module,)) from None
263    try:
264        cat = getattr(m, klass)
265    except AttributeError:
266        raise _OptionError("unknown warning category: %r" % (category,)) from None
267    if not issubclass(cat, Warning):
268        raise _OptionError("invalid warning category: %r" % (category,))
269    return cat
270
271
272def _is_internal_frame(frame):
273    """Signal whether the frame is an internal CPython implementation detail."""
274    filename = frame.f_code.co_filename
275    return 'importlib' in filename and '_bootstrap' in filename
276
277
278def _next_external_frame(frame):
279    """Find the next frame that doesn't involve CPython internals."""
280    frame = frame.f_back
281    while frame is not None and _is_internal_frame(frame):
282        frame = frame.f_back
283    return frame
284
285
286# Code typically replaced by _warnings
287def warn(message, category=None, stacklevel=1, source=None):
288    """Issue a warning, or maybe ignore it or raise an exception."""
289    # Check if message is already a Warning object
290    if isinstance(message, Warning):
291        category = message.__class__
292    # Check category argument
293    if category is None:
294        category = UserWarning
295    if not (isinstance(category, type) and issubclass(category, Warning)):
296        raise TypeError("category must be a Warning subclass, "
297                        "not '{:s}'".format(type(category).__name__))
298    # Get context information
299    try:
300        if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
301            # If frame is too small to care or if the warning originated in
302            # internal code, then do not try to hide any frames.
303            frame = sys._getframe(stacklevel)
304        else:
305            frame = sys._getframe(1)
306            # Look for one frame less since the above line starts us off.
307            for x in range(stacklevel-1):
308                frame = _next_external_frame(frame)
309                if frame is None:
310                    raise ValueError
311    except ValueError:
312        globals = sys.__dict__
313        filename = "sys"
314        lineno = 1
315    else:
316        globals = frame.f_globals
317        filename = frame.f_code.co_filename
318        lineno = frame.f_lineno
319    if '__name__' in globals:
320        module = globals['__name__']
321    else:
322        module = "<string>"
323    registry = globals.setdefault("__warningregistry__", {})
324    warn_explicit(message, category, filename, lineno, module, registry,
325                  globals, source)
326
327def warn_explicit(message, category, filename, lineno,
328                  module=None, registry=None, module_globals=None,
329                  source=None):
330    lineno = int(lineno)
331    if module is None:
332        module = filename or "<unknown>"
333        if module[-3:].lower() == ".py":
334            module = module[:-3] # XXX What about leading pathname?
335    if registry is None:
336        registry = {}
337    if registry.get('version', 0) != _filters_version:
338        registry.clear()
339        registry['version'] = _filters_version
340    if isinstance(message, Warning):
341        text = str(message)
342        category = message.__class__
343    else:
344        text = message
345        message = category(message)
346    key = (text, category, lineno)
347    # Quick test for common case
348    if registry.get(key):
349        return
350    # Search the filters
351    for item in filters:
352        action, msg, cat, mod, ln = item
353        if ((msg is None or msg.match(text)) and
354            issubclass(category, cat) and
355            (mod is None or mod.match(module)) and
356            (ln == 0 or lineno == ln)):
357            break
358    else:
359        action = defaultaction
360    # Early exit actions
361    if action == "ignore":
362        return
363
364    # Prime the linecache for formatting, in case the
365    # "file" is actually in a zipfile or something.
366    import linecache
367    linecache.getlines(filename, module_globals)
368
369    if action == "error":
370        raise message
371    # Other actions
372    if action == "once":
373        registry[key] = 1
374        oncekey = (text, category)
375        if onceregistry.get(oncekey):
376            return
377        onceregistry[oncekey] = 1
378    elif action == "always":
379        pass
380    elif action == "module":
381        registry[key] = 1
382        altkey = (text, category, 0)
383        if registry.get(altkey):
384            return
385        registry[altkey] = 1
386    elif action == "default":
387        registry[key] = 1
388    else:
389        # Unrecognized actions are errors
390        raise RuntimeError(
391              "Unrecognized action (%r) in warnings.filters:\n %s" %
392              (action, item))
393    # Print message and context
394    msg = WarningMessage(message, category, filename, lineno, source)
395    _showwarnmsg(msg)
396
397
398class WarningMessage(object):
399
400    _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
401                        "line", "source")
402
403    def __init__(self, message, category, filename, lineno, file=None,
404                 line=None, source=None):
405        self.message = message
406        self.category = category
407        self.filename = filename
408        self.lineno = lineno
409        self.file = file
410        self.line = line
411        self.source = source
412        self._category_name = category.__name__ if category else None
413
414    def __str__(self):
415        return ("{message : %r, category : %r, filename : %r, lineno : %s, "
416                    "line : %r}" % (self.message, self._category_name,
417                                    self.filename, self.lineno, self.line))
418
419
420class catch_warnings(object):
421
422    """A context manager that copies and restores the warnings filter upon
423    exiting the context.
424
425    The 'record' argument specifies whether warnings should be captured by a
426    custom implementation of warnings.showwarning() and be appended to a list
427    returned by the context manager. Otherwise None is returned by the context
428    manager. The objects appended to the list are arguments whose attributes
429    mirror the arguments to showwarning().
430
431    The 'module' argument is to specify an alternative module to the module
432    named 'warnings' and imported under that name. This argument is only useful
433    when testing the warnings module itself.
434
435    """
436
437    def __init__(self, *, record=False, module=None):
438        """Specify whether to record warnings and if an alternative module
439        should be used other than sys.modules['warnings'].
440
441        For compatibility with Python 3.0, please consider all arguments to be
442        keyword-only.
443
444        """
445        self._record = record
446        self._module = sys.modules['warnings'] if module is None else module
447        self._entered = False
448
449    def __repr__(self):
450        args = []
451        if self._record:
452            args.append("record=True")
453        if self._module is not sys.modules['warnings']:
454            args.append("module=%r" % self._module)
455        name = type(self).__name__
456        return "%s(%s)" % (name, ", ".join(args))
457
458    def __enter__(self):
459        if self._entered:
460            raise RuntimeError("Cannot enter %r twice" % self)
461        self._entered = True
462        self._filters = self._module.filters
463        self._module.filters = self._filters[:]
464        self._module._filters_mutated()
465        self._showwarning = self._module.showwarning
466        self._showwarnmsg_impl = self._module._showwarnmsg_impl
467        if self._record:
468            log = []
469            self._module._showwarnmsg_impl = log.append
470            # Reset showwarning() to the default implementation to make sure
471            # that _showwarnmsg() calls _showwarnmsg_impl()
472            self._module.showwarning = self._module._showwarning_orig
473            return log
474        else:
475            return None
476
477    def __exit__(self, *exc_info):
478        if not self._entered:
479            raise RuntimeError("Cannot exit %r without entering first" % self)
480        self._module.filters = self._filters
481        self._module._filters_mutated()
482        self._module.showwarning = self._showwarning
483        self._module._showwarnmsg_impl = self._showwarnmsg_impl
484
485
486# Private utility function called by _PyErr_WarnUnawaitedCoroutine
487def _warn_unawaited_coroutine(coro):
488    msg_lines = [
489        f"coroutine '{coro.__qualname__}' was never awaited\n"
490    ]
491    if coro.cr_origin is not None:
492        import linecache, traceback
493        def extract():
494            for filename, lineno, funcname in reversed(coro.cr_origin):
495                line = linecache.getline(filename, lineno)
496                yield (filename, lineno, funcname, line)
497        msg_lines.append("Coroutine created at (most recent call last)\n")
498        msg_lines += traceback.format_list(list(extract()))
499    msg = "".join(msg_lines).rstrip("\n")
500    # Passing source= here means that if the user happens to have tracemalloc
501    # enabled and tracking where the coroutine was created, the warning will
502    # contain that traceback. This does mean that if they have *both*
503    # coroutine origin tracking *and* tracemalloc enabled, they'll get two
504    # partially-redundant tracebacks. If we wanted to be clever we could
505    # probably detect this case and avoid it, but for now we don't bother.
506    warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
507
508
509# filters contains a sequence of filter 5-tuples
510# The components of the 5-tuple are:
511# - an action: error, ignore, always, default, module, or once
512# - a compiled regex that must match the warning message
513# - a class representing the warning category
514# - a compiled regex that must match the module that is being warned
515# - a line number for the line being warning, or 0 to mean any line
516# If either if the compiled regexs are None, match anything.
517try:
518    from _warnings import (filters, _defaultaction, _onceregistry,
519                           warn, warn_explicit, _filters_mutated)
520    defaultaction = _defaultaction
521    onceregistry = _onceregistry
522    _warnings_defaults = True
523except ImportError:
524    filters = []
525    defaultaction = "default"
526    onceregistry = {}
527
528    _filters_version = 1
529
530    def _filters_mutated():
531        global _filters_version
532        _filters_version += 1
533
534    _warnings_defaults = False
535
536
537# Module initialization
538_processoptions(sys.warnoptions)
539if not _warnings_defaults:
540    # Several warning categories are ignored by default in regular builds
541    if not hasattr(sys, 'gettotalrefcount'):
542        filterwarnings("default", category=DeprecationWarning,
543                       module="__main__", append=1)
544        simplefilter("ignore", category=DeprecationWarning, append=1)
545        simplefilter("ignore", category=PendingDeprecationWarning, append=1)
546        simplefilter("ignore", category=ImportWarning, append=1)
547        simplefilter("ignore", category=ResourceWarning, append=1)
548
549del _warnings_defaults
550