• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Python part of the warnings subsystem."""
2
3import sys
4
5
6__all__ = ["warn", "warn_explicit", "showwarning",
7           "formatwarning", "filterwarnings", "simplefilter",
8           "resetwarnings", "catch_warnings"]
9
10def showwarning(message, category, filename, lineno, file=None, line=None):
11    """Hook to write a warning to a file; replace if you like."""
12    msg = WarningMessage(message, category, filename, lineno, file, line)
13    _showwarnmsg_impl(msg)
14
15def formatwarning(message, category, filename, lineno, line=None):
16    """Function to format a warning the standard way."""
17    msg = WarningMessage(message, category, filename, lineno, None, line)
18    return _formatwarnmsg_impl(msg)
19
20def _showwarnmsg_impl(msg):
21    file = msg.file
22    if file is None:
23        file = sys.stderr
24        if file is None:
25            # sys.stderr is None when run with pythonw.exe:
26            # warnings get lost
27            return
28    text = _formatwarnmsg(msg)
29    try:
30        file.write(text)
31    except OSError:
32        # the file (probably stderr) is invalid - this warning gets lost.
33        pass
34
35def _formatwarnmsg_impl(msg):
36    category = msg.category.__name__
37    s =  f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
38
39    if msg.line is None:
40        try:
41            import linecache
42            line = linecache.getline(msg.filename, msg.lineno)
43        except Exception:
44            # When a warning is logged during Python shutdown, linecache
45            # and the import machinery don't work anymore
46            line = None
47            linecache = None
48    else:
49        line = msg.line
50    if line:
51        line = line.strip()
52        s += "  %s\n" % line
53
54    if msg.source is not None:
55        try:
56            import tracemalloc
57        # Logging a warning should not raise a new exception:
58        # catch Exception, not only ImportError and RecursionError.
59        except Exception:
60            # don't suggest to enable tracemalloc if it's not available
61            tracing = True
62            tb = None
63        else:
64            tracing = tracemalloc.is_tracing()
65            try:
66                tb = tracemalloc.get_object_traceback(msg.source)
67            except Exception:
68                # When a warning is logged during Python shutdown, tracemalloc
69                # and the import machinery don't work anymore
70                tb = None
71
72        if tb is not None:
73            s += 'Object allocated at (most recent call last):\n'
74            for frame in tb:
75                s += ('  File "%s", lineno %s\n'
76                      % (frame.filename, frame.lineno))
77
78                try:
79                    if linecache is not None:
80                        line = linecache.getline(frame.filename, frame.lineno)
81                    else:
82                        line = None
83                except Exception:
84                    line = None
85                if line:
86                    line = line.strip()
87                    s += '    %s\n' % line
88        elif not tracing:
89            s += (f'{category}: Enable tracemalloc to get the object '
90                  f'allocation traceback\n')
91    return s
92
93# Keep a reference to check if the function was replaced
94_showwarning_orig = showwarning
95
96def _showwarnmsg(msg):
97    """Hook to write a warning to a file; replace if you like."""
98    try:
99        sw = showwarning
100    except NameError:
101        pass
102    else:
103        if sw is not _showwarning_orig:
104            # warnings.showwarning() was replaced
105            if not callable(sw):
106                raise TypeError("warnings.showwarning() must be set to a "
107                                "function or method")
108
109            sw(msg.message, msg.category, msg.filename, msg.lineno,
110               msg.file, msg.line)
111            return
112    _showwarnmsg_impl(msg)
113
114# Keep a reference to check if the function was replaced
115_formatwarning_orig = formatwarning
116
117def _formatwarnmsg(msg):
118    """Function to format a warning the standard way."""
119    try:
120        fw = formatwarning
121    except NameError:
122        pass
123    else:
124        if fw is not _formatwarning_orig:
125            # warnings.formatwarning() was replaced
126            return fw(msg.message, msg.category,
127                      msg.filename, msg.lineno, msg.line)
128    return _formatwarnmsg_impl(msg)
129
130def filterwarnings(action, message="", category=Warning, module="", lineno=0,
131                   append=False):
132    """Insert an entry into the list of warnings filters (at the front).
133
134    'action' -- one of "error", "ignore", "always", "default", "module",
135                or "once"
136    'message' -- a regex that the warning message must match
137    'category' -- a class that the warning must be a subclass of
138    'module' -- a regex that the module name must match
139    'lineno' -- an integer line number, 0 matches all warnings
140    'append' -- if true, append to the list of filters
141    """
142    assert action in ("error", "ignore", "always", "default", "module",
143                      "once"), "invalid action: %r" % (action,)
144    assert isinstance(message, str), "message must be a string"
145    assert isinstance(category, type), "category must be a class"
146    assert issubclass(category, Warning), "category must be a Warning subclass"
147    assert isinstance(module, str), "module must be a string"
148    assert isinstance(lineno, int) and lineno >= 0, \
149           "lineno must be an int >= 0"
150
151    if message or module:
152        import re
153
154    if message:
155        message = re.compile(message, re.I)
156    else:
157        message = None
158    if module:
159        module = re.compile(module)
160    else:
161        module = None
162
163    _add_filter(action, message, category, module, lineno, append=append)
164
165def simplefilter(action, category=Warning, lineno=0, append=False):
166    """Insert a simple entry into the list of warnings filters (at the front).
167
168    A simple filter matches all modules and messages.
169    'action' -- one of "error", "ignore", "always", "default", "module",
170                or "once"
171    'category' -- a class that the warning must be a subclass of
172    'lineno' -- an integer line number, 0 matches all warnings
173    'append' -- if true, append to the list of filters
174    """
175    assert action in ("error", "ignore", "always", "default", "module",
176                      "once"), "invalid action: %r" % (action,)
177    assert isinstance(lineno, int) and lineno >= 0, \
178           "lineno must be an int >= 0"
179    _add_filter(action, None, category, None, lineno, append=append)
180
181def _add_filter(*item, append):
182    # Remove possible duplicate filters, so new one will be placed
183    # in correct place. If append=True and duplicate exists, do nothing.
184    if not append:
185        try:
186            filters.remove(item)
187        except ValueError:
188            pass
189        filters.insert(0, item)
190    else:
191        if item not in filters:
192            filters.append(item)
193    _filters_mutated()
194
195def resetwarnings():
196    """Clear the list of warning filters, so that no filters are active."""
197    filters[:] = []
198    _filters_mutated()
199
200class _OptionError(Exception):
201    """Exception used by option processing helpers."""
202    pass
203
204# Helper to process -W options passed via sys.warnoptions
205def _processoptions(args):
206    for arg in args:
207        try:
208            _setoption(arg)
209        except _OptionError as msg:
210            print("Invalid -W option ignored:", msg, file=sys.stderr)
211
212# Helper for _processoptions()
213def _setoption(arg):
214    import re
215    parts = arg.split(':')
216    if len(parts) > 5:
217        raise _OptionError("too many fields (max 5): %r" % (arg,))
218    while len(parts) < 5:
219        parts.append('')
220    action, message, category, module, lineno = [s.strip()
221                                                 for s in parts]
222    action = _getaction(action)
223    message = re.escape(message)
224    category = _getcategory(category)
225    module = re.escape(module)
226    if module:
227        module = module + '$'
228    if lineno:
229        try:
230            lineno = int(lineno)
231            if lineno < 0:
232                raise ValueError
233        except (ValueError, OverflowError):
234            raise _OptionError("invalid lineno %r" % (lineno,)) from None
235    else:
236        lineno = 0
237    filterwarnings(action, message, category, module, lineno)
238
239# Helper for _setoption()
240def _getaction(action):
241    if not action:
242        return "default"
243    if action == "all": return "always" # Alias
244    for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
245        if a.startswith(action):
246            return a
247    raise _OptionError("invalid action: %r" % (action,))
248
249# Helper for _setoption()
250def _getcategory(category):
251    import re
252    if not category:
253        return Warning
254    if re.match("^[a-zA-Z0-9_]+$", category):
255        try:
256            cat = eval(category)
257        except NameError:
258            raise _OptionError("unknown warning category: %r" % (category,)) from None
259    else:
260        i = category.rfind(".")
261        module = category[:i]
262        klass = category[i+1:]
263        try:
264            m = __import__(module, None, None, [klass])
265        except ImportError:
266            raise _OptionError("invalid module name: %r" % (module,)) from None
267        try:
268            cat = getattr(m, klass)
269        except AttributeError:
270            raise _OptionError("unknown warning category: %r" % (category,)) from None
271    if not issubclass(cat, Warning):
272        raise _OptionError("invalid warning category: %r" % (category,))
273    return cat
274
275
276def _is_internal_frame(frame):
277    """Signal whether the frame is an internal CPython implementation detail."""
278    filename = frame.f_code.co_filename
279    return 'importlib' in filename and '_bootstrap' in filename
280
281
282def _next_external_frame(frame):
283    """Find the next frame that doesn't involve CPython internals."""
284    frame = frame.f_back
285    while frame is not None and _is_internal_frame(frame):
286        frame = frame.f_back
287    return frame
288
289
290# Code typically replaced by _warnings
291def warn(message, category=None, stacklevel=1, source=None):
292    """Issue a warning, or maybe ignore it or raise an exception."""
293    # Check if message is already a Warning object
294    if isinstance(message, Warning):
295        category = message.__class__
296    # Check category argument
297    if category is None:
298        category = UserWarning
299    if not (isinstance(category, type) and issubclass(category, Warning)):
300        raise TypeError("category must be a Warning subclass, "
301                        "not '{:s}'".format(type(category).__name__))
302    # Get context information
303    try:
304        if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
305            # If frame is too small to care or if the warning originated in
306            # internal code, then do not try to hide any frames.
307            frame = sys._getframe(stacklevel)
308        else:
309            frame = sys._getframe(1)
310            # Look for one frame less since the above line starts us off.
311            for x in range(stacklevel-1):
312                frame = _next_external_frame(frame)
313                if frame is None:
314                    raise ValueError
315    except ValueError:
316        globals = sys.__dict__
317        lineno = 1
318    else:
319        globals = frame.f_globals
320        lineno = frame.f_lineno
321    if '__name__' in globals:
322        module = globals['__name__']
323    else:
324        module = "<string>"
325    filename = globals.get('__file__')
326    if filename:
327        fnl = filename.lower()
328        if fnl.endswith(".pyc"):
329            filename = filename[:-1]
330    else:
331        if module == "__main__":
332            try:
333                filename = sys.argv[0]
334            except AttributeError:
335                # embedded interpreters don't have sys.argv, see bug #839151
336                filename = '__main__'
337        if not filename:
338            filename = module
339    registry = globals.setdefault("__warningregistry__", {})
340    warn_explicit(message, category, filename, lineno, module, registry,
341                  globals, source)
342
343def warn_explicit(message, category, filename, lineno,
344                  module=None, registry=None, module_globals=None,
345                  source=None):
346    lineno = int(lineno)
347    if module is None:
348        module = filename or "<unknown>"
349        if module[-3:].lower() == ".py":
350            module = module[:-3] # XXX What about leading pathname?
351    if registry is None:
352        registry = {}
353    if registry.get('version', 0) != _filters_version:
354        registry.clear()
355        registry['version'] = _filters_version
356    if isinstance(message, Warning):
357        text = str(message)
358        category = message.__class__
359    else:
360        text = message
361        message = category(message)
362    key = (text, category, lineno)
363    # Quick test for common case
364    if registry.get(key):
365        return
366    # Search the filters
367    for item in filters:
368        action, msg, cat, mod, ln = item
369        if ((msg is None or msg.match(text)) and
370            issubclass(category, cat) and
371            (mod is None or mod.match(module)) and
372            (ln == 0 or lineno == ln)):
373            break
374    else:
375        action = defaultaction
376    # Early exit actions
377    if action == "ignore":
378        return
379
380    # Prime the linecache for formatting, in case the
381    # "file" is actually in a zipfile or something.
382    import linecache
383    linecache.getlines(filename, module_globals)
384
385    if action == "error":
386        raise message
387    # Other actions
388    if action == "once":
389        registry[key] = 1
390        oncekey = (text, category)
391        if onceregistry.get(oncekey):
392            return
393        onceregistry[oncekey] = 1
394    elif action == "always":
395        pass
396    elif action == "module":
397        registry[key] = 1
398        altkey = (text, category, 0)
399        if registry.get(altkey):
400            return
401        registry[altkey] = 1
402    elif action == "default":
403        registry[key] = 1
404    else:
405        # Unrecognized actions are errors
406        raise RuntimeError(
407              "Unrecognized action (%r) in warnings.filters:\n %s" %
408              (action, item))
409    # Print message and context
410    msg = WarningMessage(message, category, filename, lineno, source)
411    _showwarnmsg(msg)
412
413
414class WarningMessage(object):
415
416    _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
417                        "line", "source")
418
419    def __init__(self, message, category, filename, lineno, file=None,
420                 line=None, source=None):
421        self.message = message
422        self.category = category
423        self.filename = filename
424        self.lineno = lineno
425        self.file = file
426        self.line = line
427        self.source = source
428        self._category_name = category.__name__ if category else None
429
430    def __str__(self):
431        return ("{message : %r, category : %r, filename : %r, lineno : %s, "
432                    "line : %r}" % (self.message, self._category_name,
433                                    self.filename, self.lineno, self.line))
434
435
436class catch_warnings(object):
437
438    """A context manager that copies and restores the warnings filter upon
439    exiting the context.
440
441    The 'record' argument specifies whether warnings should be captured by a
442    custom implementation of warnings.showwarning() and be appended to a list
443    returned by the context manager. Otherwise None is returned by the context
444    manager. The objects appended to the list are arguments whose attributes
445    mirror the arguments to showwarning().
446
447    The 'module' argument is to specify an alternative module to the module
448    named 'warnings' and imported under that name. This argument is only useful
449    when testing the warnings module itself.
450
451    """
452
453    def __init__(self, *, record=False, module=None):
454        """Specify whether to record warnings and if an alternative module
455        should be used other than sys.modules['warnings'].
456
457        For compatibility with Python 3.0, please consider all arguments to be
458        keyword-only.
459
460        """
461        self._record = record
462        self._module = sys.modules['warnings'] if module is None else module
463        self._entered = False
464
465    def __repr__(self):
466        args = []
467        if self._record:
468            args.append("record=True")
469        if self._module is not sys.modules['warnings']:
470            args.append("module=%r" % self._module)
471        name = type(self).__name__
472        return "%s(%s)" % (name, ", ".join(args))
473
474    def __enter__(self):
475        if self._entered:
476            raise RuntimeError("Cannot enter %r twice" % self)
477        self._entered = True
478        self._filters = self._module.filters
479        self._module.filters = self._filters[:]
480        self._module._filters_mutated()
481        self._showwarning = self._module.showwarning
482        self._showwarnmsg_impl = self._module._showwarnmsg_impl
483        if self._record:
484            log = []
485            self._module._showwarnmsg_impl = log.append
486            # Reset showwarning() to the default implementation to make sure
487            # that _showwarnmsg() calls _showwarnmsg_impl()
488            self._module.showwarning = self._module._showwarning_orig
489            return log
490        else:
491            return None
492
493    def __exit__(self, *exc_info):
494        if not self._entered:
495            raise RuntimeError("Cannot exit %r without entering first" % self)
496        self._module.filters = self._filters
497        self._module._filters_mutated()
498        self._module.showwarning = self._showwarning
499        self._module._showwarnmsg_impl = self._showwarnmsg_impl
500
501
502# Private utility function called by _PyErr_WarnUnawaitedCoroutine
503def _warn_unawaited_coroutine(coro):
504    msg_lines = [
505        f"coroutine '{coro.__qualname__}' was never awaited\n"
506    ]
507    if coro.cr_origin is not None:
508        import linecache, traceback
509        def extract():
510            for filename, lineno, funcname in reversed(coro.cr_origin):
511                line = linecache.getline(filename, lineno)
512                yield (filename, lineno, funcname, line)
513        msg_lines.append("Coroutine created at (most recent call last)\n")
514        msg_lines += traceback.format_list(list(extract()))
515    msg = "".join(msg_lines).rstrip("\n")
516    # Passing source= here means that if the user happens to have tracemalloc
517    # enabled and tracking where the coroutine was created, the warning will
518    # contain that traceback. This does mean that if they have *both*
519    # coroutine origin tracking *and* tracemalloc enabled, they'll get two
520    # partially-redundant tracebacks. If we wanted to be clever we could
521    # probably detect this case and avoid it, but for now we don't bother.
522    warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
523
524
525# filters contains a sequence of filter 5-tuples
526# The components of the 5-tuple are:
527# - an action: error, ignore, always, default, module, or once
528# - a compiled regex that must match the warning message
529# - a class representing the warning category
530# - a compiled regex that must match the module that is being warned
531# - a line number for the line being warning, or 0 to mean any line
532# If either if the compiled regexs are None, match anything.
533try:
534    from _warnings import (filters, _defaultaction, _onceregistry,
535                           warn, warn_explicit, _filters_mutated)
536    defaultaction = _defaultaction
537    onceregistry = _onceregistry
538    _warnings_defaults = True
539except ImportError:
540    filters = []
541    defaultaction = "default"
542    onceregistry = {}
543
544    _filters_version = 1
545
546    def _filters_mutated():
547        global _filters_version
548        _filters_version += 1
549
550    _warnings_defaults = False
551
552
553# Module initialization
554_processoptions(sys.warnoptions)
555if not _warnings_defaults:
556    # Several warning categories are ignored by default in regular builds
557    if not hasattr(sys, 'gettotalrefcount'):
558        filterwarnings("default", category=DeprecationWarning,
559                       module="__main__", append=1)
560        simplefilter("ignore", category=DeprecationWarning, append=1)
561        simplefilter("ignore", category=PendingDeprecationWarning, append=1)
562        simplefilter("ignore", category=ImportWarning, append=1)
563        simplefilter("ignore", category=ResourceWarning, append=1)
564
565del _warnings_defaults
566