• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Python part of the warnings subsystem."""
2
3import sys
4
5
6__all__ = ["warn", "warn_explicit", "showwarning",
7           "formatwarning", "filterwarnings", "simplefilter",
8           "resetwarnings", "catch_warnings"]
9
10def showwarning(message, category, filename, lineno, file=None, line=None):
11    """Hook to write a warning to a file; replace if you like."""
12    msg = WarningMessage(message, category, filename, lineno, file, line)
13    _showwarnmsg_impl(msg)
14
15def formatwarning(message, category, filename, lineno, line=None):
16    """Function to format a warning the standard way."""
17    msg = WarningMessage(message, category, filename, lineno, None, line)
18    return _formatwarnmsg_impl(msg)
19
20def _showwarnmsg_impl(msg):
21    file = msg.file
22    if file is None:
23        file = sys.stderr
24        if file is None:
25            # sys.stderr is None when run with pythonw.exe:
26            # warnings get lost
27            return
28    text = _formatwarnmsg(msg)
29    try:
30        file.write(text)
31    except OSError:
32        # the file (probably stderr) is invalid - this warning gets lost.
33        pass
34
35def _formatwarnmsg_impl(msg):
36    category = msg.category.__name__
37    s =  f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
38
39    if msg.line is None:
40        try:
41            import linecache
42            line = linecache.getline(msg.filename, msg.lineno)
43        except Exception:
44            # When a warning is logged during Python shutdown, linecache
45            # and the import machinery don't work anymore
46            line = None
47            linecache = None
48    else:
49        line = msg.line
50    if line:
51        line = line.strip()
52        s += "  %s\n" % line
53
54    if msg.source is not None:
55        try:
56            import tracemalloc
57        # Logging a warning should not raise a new exception:
58        # catch Exception, not only ImportError and RecursionError.
59        except Exception:
60            # don't suggest to enable tracemalloc if it's not available
61            tracing = True
62            tb = None
63        else:
64            tracing = tracemalloc.is_tracing()
65            try:
66                tb = tracemalloc.get_object_traceback(msg.source)
67            except Exception:
68                # When a warning is logged during Python shutdown, tracemalloc
69                # and the import machinery don't work anymore
70                tb = None
71
72        if tb is not None:
73            s += 'Object allocated at (most recent call last):\n'
74            for frame in tb:
75                s += ('  File "%s", lineno %s\n'
76                      % (frame.filename, frame.lineno))
77
78                try:
79                    if linecache is not None:
80                        line = linecache.getline(frame.filename, frame.lineno)
81                    else:
82                        line = None
83                except Exception:
84                    line = None
85                if line:
86                    line = line.strip()
87                    s += '    %s\n' % line
88        elif not tracing:
89            s += (f'{category}: Enable tracemalloc to get the object '
90                  f'allocation traceback\n')
91    return s
92
93# Keep a reference to check if the function was replaced
94_showwarning_orig = showwarning
95
96def _showwarnmsg(msg):
97    """Hook to write a warning to a file; replace if you like."""
98    try:
99        sw = showwarning
100    except NameError:
101        pass
102    else:
103        if sw is not _showwarning_orig:
104            # warnings.showwarning() was replaced
105            if not callable(sw):
106                raise TypeError("warnings.showwarning() must be set to a "
107                                "function or method")
108
109            sw(msg.message, msg.category, msg.filename, msg.lineno,
110               msg.file, msg.line)
111            return
112    _showwarnmsg_impl(msg)
113
114# Keep a reference to check if the function was replaced
115_formatwarning_orig = formatwarning
116
117def _formatwarnmsg(msg):
118    """Function to format a warning the standard way."""
119    try:
120        fw = formatwarning
121    except NameError:
122        pass
123    else:
124        if fw is not _formatwarning_orig:
125            # warnings.formatwarning() was replaced
126            return fw(msg.message, msg.category,
127                      msg.filename, msg.lineno, msg.line)
128    return _formatwarnmsg_impl(msg)
129
130def filterwarnings(action, message="", category=Warning, module="", lineno=0,
131                   append=False):
132    """Insert an entry into the list of warnings filters (at the front).
133
134    'action' -- one of "error", "ignore", "always", "default", "module",
135                or "once"
136    'message' -- a regex that the warning message must match
137    'category' -- a class that the warning must be a subclass of
138    'module' -- a regex that the module name must match
139    'lineno' -- an integer line number, 0 matches all warnings
140    'append' -- if true, append to the list of filters
141    """
142    assert action in ("error", "ignore", "always", "default", "module",
143                      "once"), "invalid action: %r" % (action,)
144    assert isinstance(message, str), "message must be a string"
145    assert isinstance(category, type), "category must be a class"
146    assert issubclass(category, Warning), "category must be a Warning subclass"
147    assert isinstance(module, str), "module must be a string"
148    assert isinstance(lineno, int) and lineno >= 0, \
149           "lineno must be an int >= 0"
150
151    if message or module:
152        import re
153
154    if message:
155        message = re.compile(message, re.I)
156    else:
157        message = None
158    if module:
159        module = re.compile(module)
160    else:
161        module = None
162
163    _add_filter(action, message, category, module, lineno, append=append)
164
165def simplefilter(action, category=Warning, lineno=0, append=False):
166    """Insert a simple entry into the list of warnings filters (at the front).
167
168    A simple filter matches all modules and messages.
169    'action' -- one of "error", "ignore", "always", "default", "module",
170                or "once"
171    'category' -- a class that the warning must be a subclass of
172    'lineno' -- an integer line number, 0 matches all warnings
173    'append' -- if true, append to the list of filters
174    """
175    assert action in ("error", "ignore", "always", "default", "module",
176                      "once"), "invalid action: %r" % (action,)
177    assert isinstance(lineno, int) and lineno >= 0, \
178           "lineno must be an int >= 0"
179    _add_filter(action, None, category, None, lineno, append=append)
180
181def _add_filter(*item, append):
182    # Remove possible duplicate filters, so new one will be placed
183    # in correct place. If append=True and duplicate exists, do nothing.
184    if not append:
185        try:
186            filters.remove(item)
187        except ValueError:
188            pass
189        filters.insert(0, item)
190    else:
191        if item not in filters:
192            filters.append(item)
193    _filters_mutated()
194
195def resetwarnings():
196    """Clear the list of warning filters, so that no filters are active."""
197    filters[:] = []
198    _filters_mutated()
199
200class _OptionError(Exception):
201    """Exception used by option processing helpers."""
202    pass
203
204# Helper to process -W options passed via sys.warnoptions
205def _processoptions(args):
206    for arg in args:
207        try:
208            _setoption(arg)
209        except _OptionError as msg:
210            print("Invalid -W option ignored:", msg, file=sys.stderr)
211
212# Helper for _processoptions()
213def _setoption(arg):
214    import re
215    parts = arg.split(':')
216    if len(parts) > 5:
217        raise _OptionError("too many fields (max 5): %r" % (arg,))
218    while len(parts) < 5:
219        parts.append('')
220    action, message, category, module, lineno = [s.strip()
221                                                 for s in parts]
222    action = _getaction(action)
223    message = re.escape(message)
224    category = _getcategory(category)
225    module = re.escape(module)
226    if module:
227        module = module + '$'
228    if lineno:
229        try:
230            lineno = int(lineno)
231            if lineno < 0:
232                raise ValueError
233        except (ValueError, OverflowError):
234            raise _OptionError("invalid lineno %r" % (lineno,)) from None
235    else:
236        lineno = 0
237    filterwarnings(action, message, category, module, lineno)
238
239# Helper for _setoption()
240def _getaction(action):
241    if not action:
242        return "default"
243    if action == "all": return "always" # Alias
244    for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
245        if a.startswith(action):
246            return a
247    raise _OptionError("invalid action: %r" % (action,))
248
249# Helper for _setoption()
250def _getcategory(category):
251    import re
252    if not category:
253        return Warning
254    if re.match("^[a-zA-Z0-9_]+$", category):
255        try:
256            cat = eval(category)
257        except NameError:
258            raise _OptionError("unknown warning category: %r" % (category,)) from None
259    else:
260        i = category.rfind(".")
261        module = category[:i]
262        klass = category[i+1:]
263        try:
264            m = __import__(module, None, None, [klass])
265        except ImportError:
266            raise _OptionError("invalid module name: %r" % (module,)) from None
267        try:
268            cat = getattr(m, klass)
269        except AttributeError:
270            raise _OptionError("unknown warning category: %r" % (category,)) from None
271    if not issubclass(cat, Warning):
272        raise _OptionError("invalid warning category: %r" % (category,))
273    return cat
274
275
276def _is_internal_frame(frame):
277    """Signal whether the frame is an internal CPython implementation detail."""
278    filename = frame.f_code.co_filename
279    return 'importlib' in filename and '_bootstrap' in filename
280
281
282def _next_external_frame(frame):
283    """Find the next frame that doesn't involve CPython internals."""
284    frame = frame.f_back
285    while frame is not None and _is_internal_frame(frame):
286        frame = frame.f_back
287    return frame
288
289
290# Code typically replaced by _warnings
291def warn(message, category=None, stacklevel=1, source=None):
292    """Issue a warning, or maybe ignore it or raise an exception."""
293    # Check if message is already a Warning object
294    if isinstance(message, Warning):
295        category = message.__class__
296    # Check category argument
297    if category is None:
298        category = UserWarning
299    if not (isinstance(category, type) and issubclass(category, Warning)):
300        raise TypeError("category must be a Warning subclass, "
301                        "not '{:s}'".format(type(category).__name__))
302    # Get context information
303    try:
304        if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
305            # If frame is too small to care or if the warning originated in
306            # internal code, then do not try to hide any frames.
307            frame = sys._getframe(stacklevel)
308        else:
309            frame = sys._getframe(1)
310            # Look for one frame less since the above line starts us off.
311            for x in range(stacklevel-1):
312                frame = _next_external_frame(frame)
313                if frame is None:
314                    raise ValueError
315    except ValueError:
316        globals = sys.__dict__
317        filename = "sys"
318        lineno = 1
319    else:
320        globals = frame.f_globals
321        filename = frame.f_code.co_filename
322        lineno = frame.f_lineno
323    if '__name__' in globals:
324        module = globals['__name__']
325    else:
326        module = "<string>"
327    registry = globals.setdefault("__warningregistry__", {})
328    warn_explicit(message, category, filename, lineno, module, registry,
329                  globals, source)
330
331def warn_explicit(message, category, filename, lineno,
332                  module=None, registry=None, module_globals=None,
333                  source=None):
334    lineno = int(lineno)
335    if module is None:
336        module = filename or "<unknown>"
337        if module[-3:].lower() == ".py":
338            module = module[:-3] # XXX What about leading pathname?
339    if registry is None:
340        registry = {}
341    if registry.get('version', 0) != _filters_version:
342        registry.clear()
343        registry['version'] = _filters_version
344    if isinstance(message, Warning):
345        text = str(message)
346        category = message.__class__
347    else:
348        text = message
349        message = category(message)
350    key = (text, category, lineno)
351    # Quick test for common case
352    if registry.get(key):
353        return
354    # Search the filters
355    for item in filters:
356        action, msg, cat, mod, ln = item
357        if ((msg is None or msg.match(text)) and
358            issubclass(category, cat) and
359            (mod is None or mod.match(module)) and
360            (ln == 0 or lineno == ln)):
361            break
362    else:
363        action = defaultaction
364    # Early exit actions
365    if action == "ignore":
366        return
367
368    # Prime the linecache for formatting, in case the
369    # "file" is actually in a zipfile or something.
370    import linecache
371    linecache.getlines(filename, module_globals)
372
373    if action == "error":
374        raise message
375    # Other actions
376    if action == "once":
377        registry[key] = 1
378        oncekey = (text, category)
379        if onceregistry.get(oncekey):
380            return
381        onceregistry[oncekey] = 1
382    elif action == "always":
383        pass
384    elif action == "module":
385        registry[key] = 1
386        altkey = (text, category, 0)
387        if registry.get(altkey):
388            return
389        registry[altkey] = 1
390    elif action == "default":
391        registry[key] = 1
392    else:
393        # Unrecognized actions are errors
394        raise RuntimeError(
395              "Unrecognized action (%r) in warnings.filters:\n %s" %
396              (action, item))
397    # Print message and context
398    msg = WarningMessage(message, category, filename, lineno, source)
399    _showwarnmsg(msg)
400
401
402class WarningMessage(object):
403
404    _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
405                        "line", "source")
406
407    def __init__(self, message, category, filename, lineno, file=None,
408                 line=None, source=None):
409        self.message = message
410        self.category = category
411        self.filename = filename
412        self.lineno = lineno
413        self.file = file
414        self.line = line
415        self.source = source
416        self._category_name = category.__name__ if category else None
417
418    def __str__(self):
419        return ("{message : %r, category : %r, filename : %r, lineno : %s, "
420                    "line : %r}" % (self.message, self._category_name,
421                                    self.filename, self.lineno, self.line))
422
423
424class catch_warnings(object):
425
426    """A context manager that copies and restores the warnings filter upon
427    exiting the context.
428
429    The 'record' argument specifies whether warnings should be captured by a
430    custom implementation of warnings.showwarning() and be appended to a list
431    returned by the context manager. Otherwise None is returned by the context
432    manager. The objects appended to the list are arguments whose attributes
433    mirror the arguments to showwarning().
434
435    The 'module' argument is to specify an alternative module to the module
436    named 'warnings' and imported under that name. This argument is only useful
437    when testing the warnings module itself.
438
439    """
440
441    def __init__(self, *, record=False, module=None):
442        """Specify whether to record warnings and if an alternative module
443        should be used other than sys.modules['warnings'].
444
445        For compatibility with Python 3.0, please consider all arguments to be
446        keyword-only.
447
448        """
449        self._record = record
450        self._module = sys.modules['warnings'] if module is None else module
451        self._entered = False
452
453    def __repr__(self):
454        args = []
455        if self._record:
456            args.append("record=True")
457        if self._module is not sys.modules['warnings']:
458            args.append("module=%r" % self._module)
459        name = type(self).__name__
460        return "%s(%s)" % (name, ", ".join(args))
461
462    def __enter__(self):
463        if self._entered:
464            raise RuntimeError("Cannot enter %r twice" % self)
465        self._entered = True
466        self._filters = self._module.filters
467        self._module.filters = self._filters[:]
468        self._module._filters_mutated()
469        self._showwarning = self._module.showwarning
470        self._showwarnmsg_impl = self._module._showwarnmsg_impl
471        if self._record:
472            log = []
473            self._module._showwarnmsg_impl = log.append
474            # Reset showwarning() to the default implementation to make sure
475            # that _showwarnmsg() calls _showwarnmsg_impl()
476            self._module.showwarning = self._module._showwarning_orig
477            return log
478        else:
479            return None
480
481    def __exit__(self, *exc_info):
482        if not self._entered:
483            raise RuntimeError("Cannot exit %r without entering first" % self)
484        self._module.filters = self._filters
485        self._module._filters_mutated()
486        self._module.showwarning = self._showwarning
487        self._module._showwarnmsg_impl = self._showwarnmsg_impl
488
489
490# Private utility function called by _PyErr_WarnUnawaitedCoroutine
491def _warn_unawaited_coroutine(coro):
492    msg_lines = [
493        f"coroutine '{coro.__qualname__}' was never awaited\n"
494    ]
495    if coro.cr_origin is not None:
496        import linecache, traceback
497        def extract():
498            for filename, lineno, funcname in reversed(coro.cr_origin):
499                line = linecache.getline(filename, lineno)
500                yield (filename, lineno, funcname, line)
501        msg_lines.append("Coroutine created at (most recent call last)\n")
502        msg_lines += traceback.format_list(list(extract()))
503    msg = "".join(msg_lines).rstrip("\n")
504    # Passing source= here means that if the user happens to have tracemalloc
505    # enabled and tracking where the coroutine was created, the warning will
506    # contain that traceback. This does mean that if they have *both*
507    # coroutine origin tracking *and* tracemalloc enabled, they'll get two
508    # partially-redundant tracebacks. If we wanted to be clever we could
509    # probably detect this case and avoid it, but for now we don't bother.
510    warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
511
512
513# filters contains a sequence of filter 5-tuples
514# The components of the 5-tuple are:
515# - an action: error, ignore, always, default, module, or once
516# - a compiled regex that must match the warning message
517# - a class representing the warning category
518# - a compiled regex that must match the module that is being warned
519# - a line number for the line being warning, or 0 to mean any line
520# If either if the compiled regexs are None, match anything.
521try:
522    from _warnings import (filters, _defaultaction, _onceregistry,
523                           warn, warn_explicit, _filters_mutated)
524    defaultaction = _defaultaction
525    onceregistry = _onceregistry
526    _warnings_defaults = True
527except ImportError:
528    filters = []
529    defaultaction = "default"
530    onceregistry = {}
531
532    _filters_version = 1
533
534    def _filters_mutated():
535        global _filters_version
536        _filters_version += 1
537
538    _warnings_defaults = False
539
540
541# Module initialization
542_processoptions(sys.warnoptions)
543if not _warnings_defaults:
544    # Several warning categories are ignored by default in regular builds
545    if not hasattr(sys, 'gettotalrefcount'):
546        filterwarnings("default", category=DeprecationWarning,
547                       module="__main__", append=1)
548        simplefilter("ignore", category=DeprecationWarning, append=1)
549        simplefilter("ignore", category=PendingDeprecationWarning, append=1)
550        simplefilter("ignore", category=ImportWarning, append=1)
551        simplefilter("ignore", category=ResourceWarning, append=1)
552
553del _warnings_defaults
554