• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2001-2017 by Vinay Sajip. All Rights Reserved.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose and without fee is hereby granted,
5# provided that the above copyright notice appear in all copies and that
6# both that copyright notice and this permission notice appear in
7# supporting documentation, and that the name of Vinay Sajip
8# not be used in advertising or publicity pertaining to distribution
9# of the software without specific, written prior permission.
10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
17"""Test harness for the logging module. Run all tests.
18
19Copyright (C) 2001-2017 Vinay Sajip. All Rights Reserved.
20"""
21
22import logging
23import logging.handlers
24import logging.config
25
26import codecs
27import configparser
28import copy
29import datetime
30import pathlib
31import pickle
32import io
33import gc
34import json
35import os
36import queue
37import random
38import re
39import signal
40import socket
41import struct
42import sys
43import tempfile
44from test.support.script_helper import assert_python_ok, assert_python_failure
45from test import support
46import textwrap
47import threading
48import time
49import unittest
50import warnings
51import weakref
52
53import asyncore
54from http.server import HTTPServer, BaseHTTPRequestHandler
55import smtpd
56from urllib.parse import urlparse, parse_qs
57from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
58                          ThreadingTCPServer, StreamRequestHandler)
59
60try:
61    import win32evtlog, win32evtlogutil, pywintypes
62except ImportError:
63    win32evtlog = win32evtlogutil = pywintypes = None
64
65try:
66    import zlib
67except ImportError:
68    pass
69
70class BaseTest(unittest.TestCase):
71
72    """Base class for logging tests."""
73
74    log_format = "%(name)s -> %(levelname)s: %(message)s"
75    expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
76    message_num = 0
77
78    def setUp(self):
79        """Setup the default logging stream to an internal StringIO instance,
80        so that we can examine log output as we want."""
81        self._threading_key = support.threading_setup()
82
83        logger_dict = logging.getLogger().manager.loggerDict
84        logging._acquireLock()
85        try:
86            self.saved_handlers = logging._handlers.copy()
87            self.saved_handler_list = logging._handlerList[:]
88            self.saved_loggers = saved_loggers = logger_dict.copy()
89            self.saved_name_to_level = logging._nameToLevel.copy()
90            self.saved_level_to_name = logging._levelToName.copy()
91            self.logger_states = logger_states = {}
92            for name in saved_loggers:
93                logger_states[name] = getattr(saved_loggers[name],
94                                              'disabled', None)
95        finally:
96            logging._releaseLock()
97
98        # Set two unused loggers
99        self.logger1 = logging.getLogger("\xab\xd7\xbb")
100        self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
101
102        self.root_logger = logging.getLogger("")
103        self.original_logging_level = self.root_logger.getEffectiveLevel()
104
105        self.stream = io.StringIO()
106        self.root_logger.setLevel(logging.DEBUG)
107        self.root_hdlr = logging.StreamHandler(self.stream)
108        self.root_formatter = logging.Formatter(self.log_format)
109        self.root_hdlr.setFormatter(self.root_formatter)
110        if self.logger1.hasHandlers():
111            hlist = self.logger1.handlers + self.root_logger.handlers
112            raise AssertionError('Unexpected handlers: %s' % hlist)
113        if self.logger2.hasHandlers():
114            hlist = self.logger2.handlers + self.root_logger.handlers
115            raise AssertionError('Unexpected handlers: %s' % hlist)
116        self.root_logger.addHandler(self.root_hdlr)
117        self.assertTrue(self.logger1.hasHandlers())
118        self.assertTrue(self.logger2.hasHandlers())
119
120    def tearDown(self):
121        """Remove our logging stream, and restore the original logging
122        level."""
123        self.stream.close()
124        self.root_logger.removeHandler(self.root_hdlr)
125        while self.root_logger.handlers:
126            h = self.root_logger.handlers[0]
127            self.root_logger.removeHandler(h)
128            h.close()
129        self.root_logger.setLevel(self.original_logging_level)
130        logging._acquireLock()
131        try:
132            logging._levelToName.clear()
133            logging._levelToName.update(self.saved_level_to_name)
134            logging._nameToLevel.clear()
135            logging._nameToLevel.update(self.saved_name_to_level)
136            logging._handlers.clear()
137            logging._handlers.update(self.saved_handlers)
138            logging._handlerList[:] = self.saved_handler_list
139            manager = logging.getLogger().manager
140            manager.disable = 0
141            loggerDict = manager.loggerDict
142            loggerDict.clear()
143            loggerDict.update(self.saved_loggers)
144            logger_states = self.logger_states
145            for name in self.logger_states:
146                if logger_states[name] is not None:
147                    self.saved_loggers[name].disabled = logger_states[name]
148        finally:
149            logging._releaseLock()
150
151        self.doCleanups()
152        support.threading_cleanup(*self._threading_key)
153
154    def assert_log_lines(self, expected_values, stream=None, pat=None):
155        """Match the collected log lines against the regular expression
156        self.expected_log_pat, and compare the extracted group values to
157        the expected_values list of tuples."""
158        stream = stream or self.stream
159        pat = re.compile(pat or self.expected_log_pat)
160        actual_lines = stream.getvalue().splitlines()
161        self.assertEqual(len(actual_lines), len(expected_values))
162        for actual, expected in zip(actual_lines, expected_values):
163            match = pat.search(actual)
164            if not match:
165                self.fail("Log line does not match expected pattern:\n" +
166                            actual)
167            self.assertEqual(tuple(match.groups()), expected)
168        s = stream.read()
169        if s:
170            self.fail("Remaining output at end of log stream:\n" + s)
171
172    def next_message(self):
173        """Generate a message consisting solely of an auto-incrementing
174        integer."""
175        self.message_num += 1
176        return "%d" % self.message_num
177
178
179class BuiltinLevelsTest(BaseTest):
180    """Test builtin levels and their inheritance."""
181
182    def test_flat(self):
183        # Logging levels in a flat logger namespace.
184        m = self.next_message
185
186        ERR = logging.getLogger("ERR")
187        ERR.setLevel(logging.ERROR)
188        INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
189        INF.setLevel(logging.INFO)
190        DEB = logging.getLogger("DEB")
191        DEB.setLevel(logging.DEBUG)
192
193        # These should log.
194        ERR.log(logging.CRITICAL, m())
195        ERR.error(m())
196
197        INF.log(logging.CRITICAL, m())
198        INF.error(m())
199        INF.warning(m())
200        INF.info(m())
201
202        DEB.log(logging.CRITICAL, m())
203        DEB.error(m())
204        DEB.warning(m())
205        DEB.info(m())
206        DEB.debug(m())
207
208        # These should not log.
209        ERR.warning(m())
210        ERR.info(m())
211        ERR.debug(m())
212
213        INF.debug(m())
214
215        self.assert_log_lines([
216            ('ERR', 'CRITICAL', '1'),
217            ('ERR', 'ERROR', '2'),
218            ('INF', 'CRITICAL', '3'),
219            ('INF', 'ERROR', '4'),
220            ('INF', 'WARNING', '5'),
221            ('INF', 'INFO', '6'),
222            ('DEB', 'CRITICAL', '7'),
223            ('DEB', 'ERROR', '8'),
224            ('DEB', 'WARNING', '9'),
225            ('DEB', 'INFO', '10'),
226            ('DEB', 'DEBUG', '11'),
227        ])
228
229    def test_nested_explicit(self):
230        # Logging levels in a nested namespace, all explicitly set.
231        m = self.next_message
232
233        INF = logging.getLogger("INF")
234        INF.setLevel(logging.INFO)
235        INF_ERR  = logging.getLogger("INF.ERR")
236        INF_ERR.setLevel(logging.ERROR)
237
238        # These should log.
239        INF_ERR.log(logging.CRITICAL, m())
240        INF_ERR.error(m())
241
242        # These should not log.
243        INF_ERR.warning(m())
244        INF_ERR.info(m())
245        INF_ERR.debug(m())
246
247        self.assert_log_lines([
248            ('INF.ERR', 'CRITICAL', '1'),
249            ('INF.ERR', 'ERROR', '2'),
250        ])
251
252    def test_nested_inherited(self):
253        # Logging levels in a nested namespace, inherited from parent loggers.
254        m = self.next_message
255
256        INF = logging.getLogger("INF")
257        INF.setLevel(logging.INFO)
258        INF_ERR  = logging.getLogger("INF.ERR")
259        INF_ERR.setLevel(logging.ERROR)
260        INF_UNDEF = logging.getLogger("INF.UNDEF")
261        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
262        UNDEF = logging.getLogger("UNDEF")
263
264        # These should log.
265        INF_UNDEF.log(logging.CRITICAL, m())
266        INF_UNDEF.error(m())
267        INF_UNDEF.warning(m())
268        INF_UNDEF.info(m())
269        INF_ERR_UNDEF.log(logging.CRITICAL, m())
270        INF_ERR_UNDEF.error(m())
271
272        # These should not log.
273        INF_UNDEF.debug(m())
274        INF_ERR_UNDEF.warning(m())
275        INF_ERR_UNDEF.info(m())
276        INF_ERR_UNDEF.debug(m())
277
278        self.assert_log_lines([
279            ('INF.UNDEF', 'CRITICAL', '1'),
280            ('INF.UNDEF', 'ERROR', '2'),
281            ('INF.UNDEF', 'WARNING', '3'),
282            ('INF.UNDEF', 'INFO', '4'),
283            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
284            ('INF.ERR.UNDEF', 'ERROR', '6'),
285        ])
286
287    def test_nested_with_virtual_parent(self):
288        # Logging levels when some parent does not exist yet.
289        m = self.next_message
290
291        INF = logging.getLogger("INF")
292        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
293        CHILD = logging.getLogger("INF.BADPARENT")
294        INF.setLevel(logging.INFO)
295
296        # These should log.
297        GRANDCHILD.log(logging.FATAL, m())
298        GRANDCHILD.info(m())
299        CHILD.log(logging.FATAL, m())
300        CHILD.info(m())
301
302        # These should not log.
303        GRANDCHILD.debug(m())
304        CHILD.debug(m())
305
306        self.assert_log_lines([
307            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
308            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
309            ('INF.BADPARENT', 'CRITICAL', '3'),
310            ('INF.BADPARENT', 'INFO', '4'),
311        ])
312
313    def test_regression_22386(self):
314        """See issue #22386 for more information."""
315        self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
316        self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
317
318    def test_issue27935(self):
319        fatal = logging.getLevelName('FATAL')
320        self.assertEqual(fatal, logging.FATAL)
321
322    def test_regression_29220(self):
323        """See issue #29220 for more information."""
324        logging.addLevelName(logging.INFO, '')
325        self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
326        self.assertEqual(logging.getLevelName(logging.INFO), '')
327        self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET')
328        self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET)
329
330class BasicFilterTest(BaseTest):
331
332    """Test the bundled Filter class."""
333
334    def test_filter(self):
335        # Only messages satisfying the specified criteria pass through the
336        #  filter.
337        filter_ = logging.Filter("spam.eggs")
338        handler = self.root_logger.handlers[0]
339        try:
340            handler.addFilter(filter_)
341            spam = logging.getLogger("spam")
342            spam_eggs = logging.getLogger("spam.eggs")
343            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
344            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
345
346            spam.info(self.next_message())
347            spam_eggs.info(self.next_message())  # Good.
348            spam_eggs_fish.info(self.next_message())  # Good.
349            spam_bakedbeans.info(self.next_message())
350
351            self.assert_log_lines([
352                ('spam.eggs', 'INFO', '2'),
353                ('spam.eggs.fish', 'INFO', '3'),
354            ])
355        finally:
356            handler.removeFilter(filter_)
357
358    def test_callable_filter(self):
359        # Only messages satisfying the specified criteria pass through the
360        #  filter.
361
362        def filterfunc(record):
363            parts = record.name.split('.')
364            prefix = '.'.join(parts[:2])
365            return prefix == 'spam.eggs'
366
367        handler = self.root_logger.handlers[0]
368        try:
369            handler.addFilter(filterfunc)
370            spam = logging.getLogger("spam")
371            spam_eggs = logging.getLogger("spam.eggs")
372            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
373            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
374
375            spam.info(self.next_message())
376            spam_eggs.info(self.next_message())  # Good.
377            spam_eggs_fish.info(self.next_message())  # Good.
378            spam_bakedbeans.info(self.next_message())
379
380            self.assert_log_lines([
381                ('spam.eggs', 'INFO', '2'),
382                ('spam.eggs.fish', 'INFO', '3'),
383            ])
384        finally:
385            handler.removeFilter(filterfunc)
386
387    def test_empty_filter(self):
388        f = logging.Filter()
389        r = logging.makeLogRecord({'name': 'spam.eggs'})
390        self.assertTrue(f.filter(r))
391
392#
393#   First, we define our levels. There can be as many as you want - the only
394#     limitations are that they should be integers, the lowest should be > 0 and
395#   larger values mean less information being logged. If you need specific
396#   level values which do not fit into these limitations, you can use a
397#   mapping dictionary to convert between your application levels and the
398#   logging system.
399#
400SILENT      = 120
401TACITURN    = 119
402TERSE       = 118
403EFFUSIVE    = 117
404SOCIABLE    = 116
405VERBOSE     = 115
406TALKATIVE   = 114
407GARRULOUS   = 113
408CHATTERBOX  = 112
409BORING      = 111
410
411LEVEL_RANGE = range(BORING, SILENT + 1)
412
413#
414#   Next, we define names for our levels. You don't need to do this - in which
415#   case the system will use "Level n" to denote the text for the level.
416#
417my_logging_levels = {
418    SILENT      : 'Silent',
419    TACITURN    : 'Taciturn',
420    TERSE       : 'Terse',
421    EFFUSIVE    : 'Effusive',
422    SOCIABLE    : 'Sociable',
423    VERBOSE     : 'Verbose',
424    TALKATIVE   : 'Talkative',
425    GARRULOUS   : 'Garrulous',
426    CHATTERBOX  : 'Chatterbox',
427    BORING      : 'Boring',
428}
429
430class GarrulousFilter(logging.Filter):
431
432    """A filter which blocks garrulous messages."""
433
434    def filter(self, record):
435        return record.levelno != GARRULOUS
436
437class VerySpecificFilter(logging.Filter):
438
439    """A filter which blocks sociable and taciturn messages."""
440
441    def filter(self, record):
442        return record.levelno not in [SOCIABLE, TACITURN]
443
444
445class CustomLevelsAndFiltersTest(BaseTest):
446
447    """Test various filtering possibilities with custom logging levels."""
448
449    # Skip the logger name group.
450    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
451
452    def setUp(self):
453        BaseTest.setUp(self)
454        for k, v in my_logging_levels.items():
455            logging.addLevelName(k, v)
456
457    def log_at_all_levels(self, logger):
458        for lvl in LEVEL_RANGE:
459            logger.log(lvl, self.next_message())
460
461    def test_logger_filter(self):
462        # Filter at logger level.
463        self.root_logger.setLevel(VERBOSE)
464        # Levels >= 'Verbose' are good.
465        self.log_at_all_levels(self.root_logger)
466        self.assert_log_lines([
467            ('Verbose', '5'),
468            ('Sociable', '6'),
469            ('Effusive', '7'),
470            ('Terse', '8'),
471            ('Taciturn', '9'),
472            ('Silent', '10'),
473        ])
474
475    def test_handler_filter(self):
476        # Filter at handler level.
477        self.root_logger.handlers[0].setLevel(SOCIABLE)
478        try:
479            # Levels >= 'Sociable' are good.
480            self.log_at_all_levels(self.root_logger)
481            self.assert_log_lines([
482                ('Sociable', '6'),
483                ('Effusive', '7'),
484                ('Terse', '8'),
485                ('Taciturn', '9'),
486                ('Silent', '10'),
487            ])
488        finally:
489            self.root_logger.handlers[0].setLevel(logging.NOTSET)
490
491    def test_specific_filters(self):
492        # Set a specific filter object on the handler, and then add another
493        #  filter object on the logger itself.
494        handler = self.root_logger.handlers[0]
495        specific_filter = None
496        garr = GarrulousFilter()
497        handler.addFilter(garr)
498        try:
499            self.log_at_all_levels(self.root_logger)
500            first_lines = [
501                # Notice how 'Garrulous' is missing
502                ('Boring', '1'),
503                ('Chatterbox', '2'),
504                ('Talkative', '4'),
505                ('Verbose', '5'),
506                ('Sociable', '6'),
507                ('Effusive', '7'),
508                ('Terse', '8'),
509                ('Taciturn', '9'),
510                ('Silent', '10'),
511            ]
512            self.assert_log_lines(first_lines)
513
514            specific_filter = VerySpecificFilter()
515            self.root_logger.addFilter(specific_filter)
516            self.log_at_all_levels(self.root_logger)
517            self.assert_log_lines(first_lines + [
518                # Not only 'Garrulous' is still missing, but also 'Sociable'
519                # and 'Taciturn'
520                ('Boring', '11'),
521                ('Chatterbox', '12'),
522                ('Talkative', '14'),
523                ('Verbose', '15'),
524                ('Effusive', '17'),
525                ('Terse', '18'),
526                ('Silent', '20'),
527        ])
528        finally:
529            if specific_filter:
530                self.root_logger.removeFilter(specific_filter)
531            handler.removeFilter(garr)
532
533
534class HandlerTest(BaseTest):
535    def test_name(self):
536        h = logging.Handler()
537        h.name = 'generic'
538        self.assertEqual(h.name, 'generic')
539        h.name = 'anothergeneric'
540        self.assertEqual(h.name, 'anothergeneric')
541        self.assertRaises(NotImplementedError, h.emit, None)
542
543    def test_builtin_handlers(self):
544        # We can't actually *use* too many handlers in the tests,
545        # but we can try instantiating them with various options
546        if sys.platform in ('linux', 'darwin'):
547            for existing in (True, False):
548                fd, fn = tempfile.mkstemp()
549                os.close(fd)
550                if not existing:
551                    os.unlink(fn)
552                h = logging.handlers.WatchedFileHandler(fn, delay=True)
553                if existing:
554                    dev, ino = h.dev, h.ino
555                    self.assertEqual(dev, -1)
556                    self.assertEqual(ino, -1)
557                    r = logging.makeLogRecord({'msg': 'Test'})
558                    h.handle(r)
559                    # Now remove the file.
560                    os.unlink(fn)
561                    self.assertFalse(os.path.exists(fn))
562                    # The next call should recreate the file.
563                    h.handle(r)
564                    self.assertTrue(os.path.exists(fn))
565                else:
566                    self.assertEqual(h.dev, -1)
567                    self.assertEqual(h.ino, -1)
568                h.close()
569                if existing:
570                    os.unlink(fn)
571            if sys.platform == 'darwin':
572                sockname = '/var/run/syslog'
573            else:
574                sockname = '/dev/log'
575            try:
576                h = logging.handlers.SysLogHandler(sockname)
577                self.assertEqual(h.facility, h.LOG_USER)
578                self.assertTrue(h.unixsocket)
579                h.close()
580            except OSError: # syslogd might not be available
581                pass
582        for method in ('GET', 'POST', 'PUT'):
583            if method == 'PUT':
584                self.assertRaises(ValueError, logging.handlers.HTTPHandler,
585                                  'localhost', '/log', method)
586            else:
587                h = logging.handlers.HTTPHandler('localhost', '/log', method)
588                h.close()
589        h = logging.handlers.BufferingHandler(0)
590        r = logging.makeLogRecord({})
591        self.assertTrue(h.shouldFlush(r))
592        h.close()
593        h = logging.handlers.BufferingHandler(1)
594        self.assertFalse(h.shouldFlush(r))
595        h.close()
596
597    def test_path_objects(self):
598        """
599        Test that Path objects are accepted as filename arguments to handlers.
600
601        See Issue #27493.
602        """
603        fd, fn = tempfile.mkstemp()
604        os.close(fd)
605        os.unlink(fn)
606        pfn = pathlib.Path(fn)
607        cases = (
608                    (logging.FileHandler, (pfn, 'w')),
609                    (logging.handlers.RotatingFileHandler, (pfn, 'a')),
610                    (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')),
611                )
612        if sys.platform in ('linux', 'darwin'):
613            cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
614        for cls, args in cases:
615            h = cls(*args)
616            self.assertTrue(os.path.exists(fn))
617            h.close()
618            os.unlink(fn)
619
620    @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
621    def test_race(self):
622        # Issue #14632 refers.
623        def remove_loop(fname, tries):
624            for _ in range(tries):
625                try:
626                    os.unlink(fname)
627                    self.deletion_time = time.time()
628                except OSError:
629                    pass
630                time.sleep(0.004 * random.randint(0, 4))
631
632        del_count = 500
633        log_count = 500
634
635        self.handle_time = None
636        self.deletion_time = None
637
638        for delay in (False, True):
639            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
640            os.close(fd)
641            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
642            remover.daemon = True
643            remover.start()
644            h = logging.handlers.WatchedFileHandler(fn, delay=delay)
645            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
646            h.setFormatter(f)
647            try:
648                for _ in range(log_count):
649                    time.sleep(0.005)
650                    r = logging.makeLogRecord({'msg': 'testing' })
651                    try:
652                        self.handle_time = time.time()
653                        h.handle(r)
654                    except Exception:
655                        print('Deleted at %s, '
656                              'opened at %s' % (self.deletion_time,
657                                                self.handle_time))
658                        raise
659            finally:
660                remover.join()
661                h.close()
662                if os.path.exists(fn):
663                    os.unlink(fn)
664
665    # The implementation relies on os.register_at_fork existing, but we test
666    # based on os.fork existing because that is what users and this test use.
667    # This helps ensure that when fork exists (the important concept) that the
668    # register_at_fork mechanism is also present and used.
669    @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().')
670    def test_post_fork_child_no_deadlock(self):
671        """Ensure child logging locks are not held; bpo-6721 & bpo-36533."""
672        class _OurHandler(logging.Handler):
673            def __init__(self):
674                super().__init__()
675                self.sub_handler = logging.StreamHandler(
676                    stream=open('/dev/null', 'wt'))
677
678            def emit(self, record):
679                self.sub_handler.acquire()
680                try:
681                    self.sub_handler.emit(record)
682                finally:
683                    self.sub_handler.release()
684
685        self.assertEqual(len(logging._handlers), 0)
686        refed_h = _OurHandler()
687        self.addCleanup(refed_h.sub_handler.stream.close)
688        refed_h.name = 'because we need at least one for this test'
689        self.assertGreater(len(logging._handlers), 0)
690        self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1)
691        test_logger = logging.getLogger('test_post_fork_child_no_deadlock')
692        test_logger.addHandler(refed_h)
693        test_logger.setLevel(logging.DEBUG)
694
695        locks_held__ready_to_fork = threading.Event()
696        fork_happened__release_locks_and_end_thread = threading.Event()
697
698        def lock_holder_thread_fn():
699            logging._acquireLock()
700            try:
701                refed_h.acquire()
702                try:
703                    # Tell the main thread to do the fork.
704                    locks_held__ready_to_fork.set()
705
706                    # If the deadlock bug exists, the fork will happen
707                    # without dealing with the locks we hold, deadlocking
708                    # the child.
709
710                    # Wait for a successful fork or an unreasonable amount of
711                    # time before releasing our locks.  To avoid a timing based
712                    # test we'd need communication from os.fork() as to when it
713                    # has actually happened.  Given this is a regression test
714                    # for a fixed issue, potentially less reliably detecting
715                    # regression via timing is acceptable for simplicity.
716                    # The test will always take at least this long. :(
717                    fork_happened__release_locks_and_end_thread.wait(0.5)
718                finally:
719                    refed_h.release()
720            finally:
721                logging._releaseLock()
722
723        lock_holder_thread = threading.Thread(
724                target=lock_holder_thread_fn,
725                name='test_post_fork_child_no_deadlock lock holder')
726        lock_holder_thread.start()
727
728        locks_held__ready_to_fork.wait()
729        pid = os.fork()
730        if pid == 0:  # Child.
731            try:
732                test_logger.info(r'Child process did not deadlock. \o/')
733            finally:
734                os._exit(0)
735        else:  # Parent.
736            test_logger.info(r'Parent process returned from fork. \o/')
737            fork_happened__release_locks_and_end_thread.set()
738            lock_holder_thread.join()
739            start_time = time.monotonic()
740            while True:
741                test_logger.debug('Waiting for child process.')
742                waited_pid, status = os.waitpid(pid, os.WNOHANG)
743                if waited_pid == pid:
744                    break  # child process exited.
745                if time.monotonic() - start_time > 7:
746                    break  # so long? implies child deadlock.
747                time.sleep(0.05)
748            test_logger.debug('Done waiting.')
749            if waited_pid != pid:
750                os.kill(pid, signal.SIGKILL)
751                waited_pid, status = os.waitpid(pid, 0)
752                self.fail("child process deadlocked.")
753            self.assertEqual(status, 0, msg="child process error")
754
755
756class BadStream(object):
757    def write(self, data):
758        raise RuntimeError('deliberate mistake')
759
760class TestStreamHandler(logging.StreamHandler):
761    def handleError(self, record):
762        self.error_record = record
763
764class StreamWithIntName(object):
765    level = logging.NOTSET
766    name = 2
767
768class StreamHandlerTest(BaseTest):
769    def test_error_handling(self):
770        h = TestStreamHandler(BadStream())
771        r = logging.makeLogRecord({})
772        old_raise = logging.raiseExceptions
773
774        try:
775            h.handle(r)
776            self.assertIs(h.error_record, r)
777
778            h = logging.StreamHandler(BadStream())
779            with support.captured_stderr() as stderr:
780                h.handle(r)
781                msg = '\nRuntimeError: deliberate mistake\n'
782                self.assertIn(msg, stderr.getvalue())
783
784            logging.raiseExceptions = False
785            with support.captured_stderr() as stderr:
786                h.handle(r)
787                self.assertEqual('', stderr.getvalue())
788        finally:
789            logging.raiseExceptions = old_raise
790
791    def test_stream_setting(self):
792        """
793        Test setting the handler's stream
794        """
795        h = logging.StreamHandler()
796        stream = io.StringIO()
797        old = h.setStream(stream)
798        self.assertIs(old, sys.stderr)
799        actual = h.setStream(old)
800        self.assertIs(actual, stream)
801        # test that setting to existing value returns None
802        actual = h.setStream(old)
803        self.assertIsNone(actual)
804
805    def test_can_represent_stream_with_int_name(self):
806        h = logging.StreamHandler(StreamWithIntName())
807        self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>')
808
809# -- The following section could be moved into a server_helper.py module
810# -- if it proves to be of wider utility than just test_logging
811
812class TestSMTPServer(smtpd.SMTPServer):
813    """
814    This class implements a test SMTP server.
815
816    :param addr: A (host, port) tuple which the server listens on.
817                 You can specify a port value of zero: the server's
818                 *port* attribute will hold the actual port number
819                 used, which can be used in client connections.
820    :param handler: A callable which will be called to process
821                    incoming messages. The handler will be passed
822                    the client address tuple, who the message is from,
823                    a list of recipients and the message data.
824    :param poll_interval: The interval, in seconds, used in the underlying
825                          :func:`select` or :func:`poll` call by
826                          :func:`asyncore.loop`.
827    :param sockmap: A dictionary which will be used to hold
828                    :class:`asyncore.dispatcher` instances used by
829                    :func:`asyncore.loop`. This avoids changing the
830                    :mod:`asyncore` module's global state.
831    """
832
833    def __init__(self, addr, handler, poll_interval, sockmap):
834        smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
835                                  decode_data=True)
836        self.port = self.socket.getsockname()[1]
837        self._handler = handler
838        self._thread = None
839        self.poll_interval = poll_interval
840
841    def process_message(self, peer, mailfrom, rcpttos, data):
842        """
843        Delegates to the handler passed in to the server's constructor.
844
845        Typically, this will be a test case method.
846        :param peer: The client (host, port) tuple.
847        :param mailfrom: The address of the sender.
848        :param rcpttos: The addresses of the recipients.
849        :param data: The message.
850        """
851        self._handler(peer, mailfrom, rcpttos, data)
852
853    def start(self):
854        """
855        Start the server running on a separate daemon thread.
856        """
857        self._thread = t = threading.Thread(target=self.serve_forever,
858                                            args=(self.poll_interval,))
859        t.setDaemon(True)
860        t.start()
861
862    def serve_forever(self, poll_interval):
863        """
864        Run the :mod:`asyncore` loop until normal termination
865        conditions arise.
866        :param poll_interval: The interval, in seconds, used in the underlying
867                              :func:`select` or :func:`poll` call by
868                              :func:`asyncore.loop`.
869        """
870        asyncore.loop(poll_interval, map=self._map)
871
872    def stop(self, timeout=None):
873        """
874        Stop the thread by closing the server instance.
875        Wait for the server thread to terminate.
876
877        :param timeout: How long to wait for the server thread
878                        to terminate.
879        """
880        self.close()
881        support.join_thread(self._thread, timeout)
882        self._thread = None
883        asyncore.close_all(map=self._map, ignore_all=True)
884
885
886class ControlMixin(object):
887    """
888    This mixin is used to start a server on a separate thread, and
889    shut it down programmatically. Request handling is simplified - instead
890    of needing to derive a suitable RequestHandler subclass, you just
891    provide a callable which will be passed each received request to be
892    processed.
893
894    :param handler: A handler callable which will be called with a
895                    single parameter - the request - in order to
896                    process the request. This handler is called on the
897                    server thread, effectively meaning that requests are
898                    processed serially. While not quite Web scale ;-),
899                    this should be fine for testing applications.
900    :param poll_interval: The polling interval in seconds.
901    """
902    def __init__(self, handler, poll_interval):
903        self._thread = None
904        self.poll_interval = poll_interval
905        self._handler = handler
906        self.ready = threading.Event()
907
908    def start(self):
909        """
910        Create a daemon thread to run the server, and start it.
911        """
912        self._thread = t = threading.Thread(target=self.serve_forever,
913                                            args=(self.poll_interval,))
914        t.setDaemon(True)
915        t.start()
916
917    def serve_forever(self, poll_interval):
918        """
919        Run the server. Set the ready flag before entering the
920        service loop.
921        """
922        self.ready.set()
923        super(ControlMixin, self).serve_forever(poll_interval)
924
925    def stop(self, timeout=None):
926        """
927        Tell the server thread to stop, and wait for it to do so.
928
929        :param timeout: How long to wait for the server thread
930                        to terminate.
931        """
932        self.shutdown()
933        if self._thread is not None:
934            support.join_thread(self._thread, timeout)
935            self._thread = None
936        self.server_close()
937        self.ready.clear()
938
939class TestHTTPServer(ControlMixin, HTTPServer):
940    """
941    An HTTP server which is controllable using :class:`ControlMixin`.
942
943    :param addr: A tuple with the IP address and port to listen on.
944    :param handler: A handler callable which will be called with a
945                    single parameter - the request - in order to
946                    process the request.
947    :param poll_interval: The polling interval in seconds.
948    :param log: Pass ``True`` to enable log messages.
949    """
950    def __init__(self, addr, handler, poll_interval=0.5,
951                 log=False, sslctx=None):
952        class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
953            def __getattr__(self, name, default=None):
954                if name.startswith('do_'):
955                    return self.process_request
956                raise AttributeError(name)
957
958            def process_request(self):
959                self.server._handler(self)
960
961            def log_message(self, format, *args):
962                if log:
963                    super(DelegatingHTTPRequestHandler,
964                          self).log_message(format, *args)
965        HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
966        ControlMixin.__init__(self, handler, poll_interval)
967        self.sslctx = sslctx
968
969    def get_request(self):
970        try:
971            sock, addr = self.socket.accept()
972            if self.sslctx:
973                sock = self.sslctx.wrap_socket(sock, server_side=True)
974        except OSError as e:
975            # socket errors are silenced by the caller, print them here
976            sys.stderr.write("Got an error:\n%s\n" % e)
977            raise
978        return sock, addr
979
980class TestTCPServer(ControlMixin, ThreadingTCPServer):
981    """
982    A TCP server which is controllable using :class:`ControlMixin`.
983
984    :param addr: A tuple with the IP address and port to listen on.
985    :param handler: A handler callable which will be called with a single
986                    parameter - the request - in order to process the request.
987    :param poll_interval: The polling interval in seconds.
988    :bind_and_activate: If True (the default), binds the server and starts it
989                        listening. If False, you need to call
990                        :meth:`server_bind` and :meth:`server_activate` at
991                        some later time before calling :meth:`start`, so that
992                        the server will set up the socket and listen on it.
993    """
994
995    allow_reuse_address = True
996
997    def __init__(self, addr, handler, poll_interval=0.5,
998                 bind_and_activate=True):
999        class DelegatingTCPRequestHandler(StreamRequestHandler):
1000
1001            def handle(self):
1002                self.server._handler(self)
1003        ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
1004                                    bind_and_activate)
1005        ControlMixin.__init__(self, handler, poll_interval)
1006
1007    def server_bind(self):
1008        super(TestTCPServer, self).server_bind()
1009        self.port = self.socket.getsockname()[1]
1010
1011class TestUDPServer(ControlMixin, ThreadingUDPServer):
1012    """
1013    A UDP server which is controllable using :class:`ControlMixin`.
1014
1015    :param addr: A tuple with the IP address and port to listen on.
1016    :param handler: A handler callable which will be called with a
1017                    single parameter - the request - in order to
1018                    process the request.
1019    :param poll_interval: The polling interval for shutdown requests,
1020                          in seconds.
1021    :bind_and_activate: If True (the default), binds the server and
1022                        starts it listening. If False, you need to
1023                        call :meth:`server_bind` and
1024                        :meth:`server_activate` at some later time
1025                        before calling :meth:`start`, so that the server will
1026                        set up the socket and listen on it.
1027    """
1028    def __init__(self, addr, handler, poll_interval=0.5,
1029                 bind_and_activate=True):
1030        class DelegatingUDPRequestHandler(DatagramRequestHandler):
1031
1032            def handle(self):
1033                self.server._handler(self)
1034
1035            def finish(self):
1036                data = self.wfile.getvalue()
1037                if data:
1038                    try:
1039                        super(DelegatingUDPRequestHandler, self).finish()
1040                    except OSError:
1041                        if not self.server._closed:
1042                            raise
1043
1044        ThreadingUDPServer.__init__(self, addr,
1045                                    DelegatingUDPRequestHandler,
1046                                    bind_and_activate)
1047        ControlMixin.__init__(self, handler, poll_interval)
1048        self._closed = False
1049
1050    def server_bind(self):
1051        super(TestUDPServer, self).server_bind()
1052        self.port = self.socket.getsockname()[1]
1053
1054    def server_close(self):
1055        super(TestUDPServer, self).server_close()
1056        self._closed = True
1057
1058if hasattr(socket, "AF_UNIX"):
1059    class TestUnixStreamServer(TestTCPServer):
1060        address_family = socket.AF_UNIX
1061
1062    class TestUnixDatagramServer(TestUDPServer):
1063        address_family = socket.AF_UNIX
1064
1065# - end of server_helper section
1066
1067class SMTPHandlerTest(BaseTest):
1068    # bpo-14314, bpo-19665, bpo-34092: don't wait forever, timeout of 1 minute
1069    TIMEOUT = 60.0
1070
1071    def test_basic(self):
1072        sockmap = {}
1073        server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001,
1074                                sockmap)
1075        server.start()
1076        addr = (support.HOST, server.port)
1077        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
1078                                         timeout=self.TIMEOUT)
1079        self.assertEqual(h.toaddrs, ['you'])
1080        self.messages = []
1081        r = logging.makeLogRecord({'msg': 'Hello \u2713'})
1082        self.handled = threading.Event()
1083        h.handle(r)
1084        self.handled.wait(self.TIMEOUT)
1085        server.stop()
1086        self.assertTrue(self.handled.is_set())
1087        self.assertEqual(len(self.messages), 1)
1088        peer, mailfrom, rcpttos, data = self.messages[0]
1089        self.assertEqual(mailfrom, 'me')
1090        self.assertEqual(rcpttos, ['you'])
1091        self.assertIn('\nSubject: Log\n', data)
1092        self.assertTrue(data.endswith('\n\nHello \u2713'))
1093        h.close()
1094
1095    def process_message(self, *args):
1096        self.messages.append(args)
1097        self.handled.set()
1098
1099class MemoryHandlerTest(BaseTest):
1100
1101    """Tests for the MemoryHandler."""
1102
1103    # Do not bother with a logger name group.
1104    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
1105
1106    def setUp(self):
1107        BaseTest.setUp(self)
1108        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1109                                                       self.root_hdlr)
1110        self.mem_logger = logging.getLogger('mem')
1111        self.mem_logger.propagate = 0
1112        self.mem_logger.addHandler(self.mem_hdlr)
1113
1114    def tearDown(self):
1115        self.mem_hdlr.close()
1116        BaseTest.tearDown(self)
1117
1118    def test_flush(self):
1119        # The memory handler flushes to its target handler based on specific
1120        #  criteria (message count and message level).
1121        self.mem_logger.debug(self.next_message())
1122        self.assert_log_lines([])
1123        self.mem_logger.info(self.next_message())
1124        self.assert_log_lines([])
1125        # This will flush because the level is >= logging.WARNING
1126        self.mem_logger.warning(self.next_message())
1127        lines = [
1128            ('DEBUG', '1'),
1129            ('INFO', '2'),
1130            ('WARNING', '3'),
1131        ]
1132        self.assert_log_lines(lines)
1133        for n in (4, 14):
1134            for i in range(9):
1135                self.mem_logger.debug(self.next_message())
1136            self.assert_log_lines(lines)
1137            # This will flush because it's the 10th message since the last
1138            #  flush.
1139            self.mem_logger.debug(self.next_message())
1140            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
1141            self.assert_log_lines(lines)
1142
1143        self.mem_logger.debug(self.next_message())
1144        self.assert_log_lines(lines)
1145
1146    def test_flush_on_close(self):
1147        """
1148        Test that the flush-on-close configuration works as expected.
1149        """
1150        self.mem_logger.debug(self.next_message())
1151        self.assert_log_lines([])
1152        self.mem_logger.info(self.next_message())
1153        self.assert_log_lines([])
1154        self.mem_logger.removeHandler(self.mem_hdlr)
1155        # Default behaviour is to flush on close. Check that it happens.
1156        self.mem_hdlr.close()
1157        lines = [
1158            ('DEBUG', '1'),
1159            ('INFO', '2'),
1160        ]
1161        self.assert_log_lines(lines)
1162        # Now configure for flushing not to be done on close.
1163        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1164                                                       self.root_hdlr,
1165                                                       False)
1166        self.mem_logger.addHandler(self.mem_hdlr)
1167        self.mem_logger.debug(self.next_message())
1168        self.assert_log_lines(lines)  # no change
1169        self.mem_logger.info(self.next_message())
1170        self.assert_log_lines(lines)  # no change
1171        self.mem_logger.removeHandler(self.mem_hdlr)
1172        self.mem_hdlr.close()
1173        # assert that no new lines have been added
1174        self.assert_log_lines(lines)  # no change
1175
1176
1177class ExceptionFormatter(logging.Formatter):
1178    """A special exception formatter."""
1179    def formatException(self, ei):
1180        return "Got a [%s]" % ei[0].__name__
1181
1182
1183class ConfigFileTest(BaseTest):
1184
1185    """Reading logging config from a .ini-style config file."""
1186
1187    check_no_resource_warning = support.check_no_resource_warning
1188    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1189
1190    # config0 is a standard configuration.
1191    config0 = """
1192    [loggers]
1193    keys=root
1194
1195    [handlers]
1196    keys=hand1
1197
1198    [formatters]
1199    keys=form1
1200
1201    [logger_root]
1202    level=WARNING
1203    handlers=hand1
1204
1205    [handler_hand1]
1206    class=StreamHandler
1207    level=NOTSET
1208    formatter=form1
1209    args=(sys.stdout,)
1210
1211    [formatter_form1]
1212    format=%(levelname)s ++ %(message)s
1213    datefmt=
1214    """
1215
1216    # config1 adds a little to the standard configuration.
1217    config1 = """
1218    [loggers]
1219    keys=root,parser
1220
1221    [handlers]
1222    keys=hand1
1223
1224    [formatters]
1225    keys=form1
1226
1227    [logger_root]
1228    level=WARNING
1229    handlers=
1230
1231    [logger_parser]
1232    level=DEBUG
1233    handlers=hand1
1234    propagate=1
1235    qualname=compiler.parser
1236
1237    [handler_hand1]
1238    class=StreamHandler
1239    level=NOTSET
1240    formatter=form1
1241    args=(sys.stdout,)
1242
1243    [formatter_form1]
1244    format=%(levelname)s ++ %(message)s
1245    datefmt=
1246    """
1247
1248    # config1a moves the handler to the root.
1249    config1a = """
1250    [loggers]
1251    keys=root,parser
1252
1253    [handlers]
1254    keys=hand1
1255
1256    [formatters]
1257    keys=form1
1258
1259    [logger_root]
1260    level=WARNING
1261    handlers=hand1
1262
1263    [logger_parser]
1264    level=DEBUG
1265    handlers=
1266    propagate=1
1267    qualname=compiler.parser
1268
1269    [handler_hand1]
1270    class=StreamHandler
1271    level=NOTSET
1272    formatter=form1
1273    args=(sys.stdout,)
1274
1275    [formatter_form1]
1276    format=%(levelname)s ++ %(message)s
1277    datefmt=
1278    """
1279
1280    # config2 has a subtle configuration error that should be reported
1281    config2 = config1.replace("sys.stdout", "sys.stbout")
1282
1283    # config3 has a less subtle configuration error
1284    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
1285
1286    # config4 specifies a custom formatter class to be loaded
1287    config4 = """
1288    [loggers]
1289    keys=root
1290
1291    [handlers]
1292    keys=hand1
1293
1294    [formatters]
1295    keys=form1
1296
1297    [logger_root]
1298    level=NOTSET
1299    handlers=hand1
1300
1301    [handler_hand1]
1302    class=StreamHandler
1303    level=NOTSET
1304    formatter=form1
1305    args=(sys.stdout,)
1306
1307    [formatter_form1]
1308    class=""" + __name__ + """.ExceptionFormatter
1309    format=%(levelname)s:%(name)s:%(message)s
1310    datefmt=
1311    """
1312
1313    # config5 specifies a custom handler class to be loaded
1314    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
1315
1316    # config6 uses ', ' delimiters in the handlers and formatters sections
1317    config6 = """
1318    [loggers]
1319    keys=root,parser
1320
1321    [handlers]
1322    keys=hand1, hand2
1323
1324    [formatters]
1325    keys=form1, form2
1326
1327    [logger_root]
1328    level=WARNING
1329    handlers=
1330
1331    [logger_parser]
1332    level=DEBUG
1333    handlers=hand1
1334    propagate=1
1335    qualname=compiler.parser
1336
1337    [handler_hand1]
1338    class=StreamHandler
1339    level=NOTSET
1340    formatter=form1
1341    args=(sys.stdout,)
1342
1343    [handler_hand2]
1344    class=StreamHandler
1345    level=NOTSET
1346    formatter=form1
1347    args=(sys.stderr,)
1348
1349    [formatter_form1]
1350    format=%(levelname)s ++ %(message)s
1351    datefmt=
1352
1353    [formatter_form2]
1354    format=%(message)s
1355    datefmt=
1356    """
1357
1358    # config7 adds a compiler logger, and uses kwargs instead of args.
1359    config7 = """
1360    [loggers]
1361    keys=root,parser,compiler
1362
1363    [handlers]
1364    keys=hand1
1365
1366    [formatters]
1367    keys=form1
1368
1369    [logger_root]
1370    level=WARNING
1371    handlers=hand1
1372
1373    [logger_compiler]
1374    level=DEBUG
1375    handlers=
1376    propagate=1
1377    qualname=compiler
1378
1379    [logger_parser]
1380    level=DEBUG
1381    handlers=
1382    propagate=1
1383    qualname=compiler.parser
1384
1385    [handler_hand1]
1386    class=StreamHandler
1387    level=NOTSET
1388    formatter=form1
1389    kwargs={'stream': sys.stdout,}
1390
1391    [formatter_form1]
1392    format=%(levelname)s ++ %(message)s
1393    datefmt=
1394    """
1395
1396    # config 8, check for resource warning
1397    config8 = r"""
1398    [loggers]
1399    keys=root
1400
1401    [handlers]
1402    keys=file
1403
1404    [formatters]
1405    keys=
1406
1407    [logger_root]
1408    level=DEBUG
1409    handlers=file
1410
1411    [handler_file]
1412    class=FileHandler
1413    level=DEBUG
1414    args=("{tempfile}",)
1415    """
1416
1417    disable_test = """
1418    [loggers]
1419    keys=root
1420
1421    [handlers]
1422    keys=screen
1423
1424    [formatters]
1425    keys=
1426
1427    [logger_root]
1428    level=DEBUG
1429    handlers=screen
1430
1431    [handler_screen]
1432    level=DEBUG
1433    class=StreamHandler
1434    args=(sys.stdout,)
1435    formatter=
1436    """
1437
1438    def apply_config(self, conf, **kwargs):
1439        file = io.StringIO(textwrap.dedent(conf))
1440        logging.config.fileConfig(file, **kwargs)
1441
1442    def test_config0_ok(self):
1443        # A simple config file which overrides the default settings.
1444        with support.captured_stdout() as output:
1445            self.apply_config(self.config0)
1446            logger = logging.getLogger()
1447            # Won't output anything
1448            logger.info(self.next_message())
1449            # Outputs a message
1450            logger.error(self.next_message())
1451            self.assert_log_lines([
1452                ('ERROR', '2'),
1453            ], stream=output)
1454            # Original logger output is empty.
1455            self.assert_log_lines([])
1456
1457    def test_config0_using_cp_ok(self):
1458        # A simple config file which overrides the default settings.
1459        with support.captured_stdout() as output:
1460            file = io.StringIO(textwrap.dedent(self.config0))
1461            cp = configparser.ConfigParser()
1462            cp.read_file(file)
1463            logging.config.fileConfig(cp)
1464            logger = logging.getLogger()
1465            # Won't output anything
1466            logger.info(self.next_message())
1467            # Outputs a message
1468            logger.error(self.next_message())
1469            self.assert_log_lines([
1470                ('ERROR', '2'),
1471            ], stream=output)
1472            # Original logger output is empty.
1473            self.assert_log_lines([])
1474
1475    def test_config1_ok(self, config=config1):
1476        # A config file defining a sub-parser as well.
1477        with support.captured_stdout() as output:
1478            self.apply_config(config)
1479            logger = logging.getLogger("compiler.parser")
1480            # Both will output a message
1481            logger.info(self.next_message())
1482            logger.error(self.next_message())
1483            self.assert_log_lines([
1484                ('INFO', '1'),
1485                ('ERROR', '2'),
1486            ], stream=output)
1487            # Original logger output is empty.
1488            self.assert_log_lines([])
1489
1490    def test_config2_failure(self):
1491        # A simple config file which overrides the default settings.
1492        self.assertRaises(Exception, self.apply_config, self.config2)
1493
1494    def test_config3_failure(self):
1495        # A simple config file which overrides the default settings.
1496        self.assertRaises(Exception, self.apply_config, self.config3)
1497
1498    def test_config4_ok(self):
1499        # A config file specifying a custom formatter class.
1500        with support.captured_stdout() as output:
1501            self.apply_config(self.config4)
1502            logger = logging.getLogger()
1503            try:
1504                raise RuntimeError()
1505            except RuntimeError:
1506                logging.exception("just testing")
1507            sys.stdout.seek(0)
1508            self.assertEqual(output.getvalue(),
1509                "ERROR:root:just testing\nGot a [RuntimeError]\n")
1510            # Original logger output is empty
1511            self.assert_log_lines([])
1512
1513    def test_config5_ok(self):
1514        self.test_config1_ok(config=self.config5)
1515
1516    def test_config6_ok(self):
1517        self.test_config1_ok(config=self.config6)
1518
1519    def test_config7_ok(self):
1520        with support.captured_stdout() as output:
1521            self.apply_config(self.config1a)
1522            logger = logging.getLogger("compiler.parser")
1523            # See issue #11424. compiler-hyphenated sorts
1524            # between compiler and compiler.xyz and this
1525            # was preventing compiler.xyz from being included
1526            # in the child loggers of compiler because of an
1527            # overzealous loop termination condition.
1528            hyphenated = logging.getLogger('compiler-hyphenated')
1529            # All will output a message
1530            logger.info(self.next_message())
1531            logger.error(self.next_message())
1532            hyphenated.critical(self.next_message())
1533            self.assert_log_lines([
1534                ('INFO', '1'),
1535                ('ERROR', '2'),
1536                ('CRITICAL', '3'),
1537            ], stream=output)
1538            # Original logger output is empty.
1539            self.assert_log_lines([])
1540        with support.captured_stdout() as output:
1541            self.apply_config(self.config7)
1542            logger = logging.getLogger("compiler.parser")
1543            self.assertFalse(logger.disabled)
1544            # Both will output a message
1545            logger.info(self.next_message())
1546            logger.error(self.next_message())
1547            logger = logging.getLogger("compiler.lexer")
1548            # Both will output a message
1549            logger.info(self.next_message())
1550            logger.error(self.next_message())
1551            # Will not appear
1552            hyphenated.critical(self.next_message())
1553            self.assert_log_lines([
1554                ('INFO', '4'),
1555                ('ERROR', '5'),
1556                ('INFO', '6'),
1557                ('ERROR', '7'),
1558            ], stream=output)
1559            # Original logger output is empty.
1560            self.assert_log_lines([])
1561
1562    def test_config8_ok(self):
1563
1564        def cleanup(h1, fn):
1565            h1.close()
1566            os.remove(fn)
1567
1568        with self.check_no_resource_warning():
1569            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
1570            os.close(fd)
1571
1572            # Replace single backslash with double backslash in windows
1573            # to avoid unicode error during string formatting
1574            if os.name == "nt":
1575                fn = fn.replace("\\", "\\\\")
1576
1577            config8 = self.config8.format(tempfile=fn)
1578
1579            self.apply_config(config8)
1580            self.apply_config(config8)
1581
1582        handler = logging.root.handlers[0]
1583        self.addCleanup(cleanup, handler, fn)
1584
1585    def test_logger_disabling(self):
1586        self.apply_config(self.disable_test)
1587        logger = logging.getLogger('some_pristine_logger')
1588        self.assertFalse(logger.disabled)
1589        self.apply_config(self.disable_test)
1590        self.assertTrue(logger.disabled)
1591        self.apply_config(self.disable_test, disable_existing_loggers=False)
1592        self.assertFalse(logger.disabled)
1593
1594    def test_defaults_do_no_interpolation(self):
1595        """bpo-33802 defaults should not get interpolated"""
1596        ini = textwrap.dedent("""
1597            [formatters]
1598            keys=default
1599
1600            [formatter_default]
1601
1602            [handlers]
1603            keys=console
1604
1605            [handler_console]
1606            class=logging.StreamHandler
1607            args=tuple()
1608
1609            [loggers]
1610            keys=root
1611
1612            [logger_root]
1613            formatter=default
1614            handlers=console
1615            """).strip()
1616        fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini')
1617        try:
1618            os.write(fd, ini.encode('ascii'))
1619            os.close(fd)
1620            logging.config.fileConfig(
1621                fn,
1622                defaults=dict(
1623                    version=1,
1624                    disable_existing_loggers=False,
1625                    formatters={
1626                        "generic": {
1627                            "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s",
1628                            "datefmt": "[%Y-%m-%d %H:%M:%S %z]",
1629                            "class": "logging.Formatter"
1630                        },
1631                    },
1632                )
1633            )
1634        finally:
1635            os.unlink(fn)
1636
1637
1638class SocketHandlerTest(BaseTest):
1639
1640    """Test for SocketHandler objects."""
1641
1642    server_class = TestTCPServer
1643    address = ('localhost', 0)
1644
1645    def setUp(self):
1646        """Set up a TCP server to receive log messages, and a SocketHandler
1647        pointing to that server's address and port."""
1648        BaseTest.setUp(self)
1649        # Issue #29177: deal with errors that happen during setup
1650        self.server = self.sock_hdlr = self.server_exception = None
1651        try:
1652            self.server = server = self.server_class(self.address,
1653                                                     self.handle_socket, 0.01)
1654            server.start()
1655            # Uncomment next line to test error recovery in setUp()
1656            # raise OSError('dummy error raised')
1657        except OSError as e:
1658            self.server_exception = e
1659            return
1660        server.ready.wait()
1661        hcls = logging.handlers.SocketHandler
1662        if isinstance(server.server_address, tuple):
1663            self.sock_hdlr = hcls('localhost', server.port)
1664        else:
1665            self.sock_hdlr = hcls(server.server_address, None)
1666        self.log_output = ''
1667        self.root_logger.removeHandler(self.root_logger.handlers[0])
1668        self.root_logger.addHandler(self.sock_hdlr)
1669        self.handled = threading.Semaphore(0)
1670
1671    def tearDown(self):
1672        """Shutdown the TCP server."""
1673        try:
1674            if self.sock_hdlr:
1675                self.root_logger.removeHandler(self.sock_hdlr)
1676                self.sock_hdlr.close()
1677            if self.server:
1678                self.server.stop(2.0)
1679        finally:
1680            BaseTest.tearDown(self)
1681
1682    def handle_socket(self, request):
1683        conn = request.connection
1684        while True:
1685            chunk = conn.recv(4)
1686            if len(chunk) < 4:
1687                break
1688            slen = struct.unpack(">L", chunk)[0]
1689            chunk = conn.recv(slen)
1690            while len(chunk) < slen:
1691                chunk = chunk + conn.recv(slen - len(chunk))
1692            obj = pickle.loads(chunk)
1693            record = logging.makeLogRecord(obj)
1694            self.log_output += record.msg + '\n'
1695            self.handled.release()
1696
1697    def test_output(self):
1698        # The log message sent to the SocketHandler is properly received.
1699        if self.server_exception:
1700            self.skipTest(self.server_exception)
1701        logger = logging.getLogger("tcp")
1702        logger.error("spam")
1703        self.handled.acquire()
1704        logger.debug("eggs")
1705        self.handled.acquire()
1706        self.assertEqual(self.log_output, "spam\neggs\n")
1707
1708    def test_noserver(self):
1709        if self.server_exception:
1710            self.skipTest(self.server_exception)
1711        # Avoid timing-related failures due to SocketHandler's own hard-wired
1712        # one-second timeout on socket.create_connection() (issue #16264).
1713        self.sock_hdlr.retryStart = 2.5
1714        # Kill the server
1715        self.server.stop(2.0)
1716        # The logging call should try to connect, which should fail
1717        try:
1718            raise RuntimeError('Deliberate mistake')
1719        except RuntimeError:
1720            self.root_logger.exception('Never sent')
1721        self.root_logger.error('Never sent, either')
1722        now = time.time()
1723        self.assertGreater(self.sock_hdlr.retryTime, now)
1724        time.sleep(self.sock_hdlr.retryTime - now + 0.001)
1725        self.root_logger.error('Nor this')
1726
1727def _get_temp_domain_socket():
1728    fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock')
1729    os.close(fd)
1730    # just need a name - file can't be present, or we'll get an
1731    # 'address already in use' error.
1732    os.remove(fn)
1733    return fn
1734
1735@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1736class UnixSocketHandlerTest(SocketHandlerTest):
1737
1738    """Test for SocketHandler with unix sockets."""
1739
1740    if hasattr(socket, "AF_UNIX"):
1741        server_class = TestUnixStreamServer
1742
1743    def setUp(self):
1744        # override the definition in the base class
1745        self.address = _get_temp_domain_socket()
1746        SocketHandlerTest.setUp(self)
1747
1748    def tearDown(self):
1749        SocketHandlerTest.tearDown(self)
1750        support.unlink(self.address)
1751
1752class DatagramHandlerTest(BaseTest):
1753
1754    """Test for DatagramHandler."""
1755
1756    server_class = TestUDPServer
1757    address = ('localhost', 0)
1758
1759    def setUp(self):
1760        """Set up a UDP server to receive log messages, and a DatagramHandler
1761        pointing to that server's address and port."""
1762        BaseTest.setUp(self)
1763        # Issue #29177: deal with errors that happen during setup
1764        self.server = self.sock_hdlr = self.server_exception = None
1765        try:
1766            self.server = server = self.server_class(self.address,
1767                                                     self.handle_datagram, 0.01)
1768            server.start()
1769            # Uncomment next line to test error recovery in setUp()
1770            # raise OSError('dummy error raised')
1771        except OSError as e:
1772            self.server_exception = e
1773            return
1774        server.ready.wait()
1775        hcls = logging.handlers.DatagramHandler
1776        if isinstance(server.server_address, tuple):
1777            self.sock_hdlr = hcls('localhost', server.port)
1778        else:
1779            self.sock_hdlr = hcls(server.server_address, None)
1780        self.log_output = ''
1781        self.root_logger.removeHandler(self.root_logger.handlers[0])
1782        self.root_logger.addHandler(self.sock_hdlr)
1783        self.handled = threading.Event()
1784
1785    def tearDown(self):
1786        """Shutdown the UDP server."""
1787        try:
1788            if self.server:
1789                self.server.stop(2.0)
1790            if self.sock_hdlr:
1791                self.root_logger.removeHandler(self.sock_hdlr)
1792                self.sock_hdlr.close()
1793        finally:
1794            BaseTest.tearDown(self)
1795
1796    def handle_datagram(self, request):
1797        slen = struct.pack('>L', 0) # length of prefix
1798        packet = request.packet[len(slen):]
1799        obj = pickle.loads(packet)
1800        record = logging.makeLogRecord(obj)
1801        self.log_output += record.msg + '\n'
1802        self.handled.set()
1803
1804    def test_output(self):
1805        # The log message sent to the DatagramHandler is properly received.
1806        if self.server_exception:
1807            self.skipTest(self.server_exception)
1808        logger = logging.getLogger("udp")
1809        logger.error("spam")
1810        self.handled.wait()
1811        self.handled.clear()
1812        logger.error("eggs")
1813        self.handled.wait()
1814        self.assertEqual(self.log_output, "spam\neggs\n")
1815
1816@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1817class UnixDatagramHandlerTest(DatagramHandlerTest):
1818
1819    """Test for DatagramHandler using Unix sockets."""
1820
1821    if hasattr(socket, "AF_UNIX"):
1822        server_class = TestUnixDatagramServer
1823
1824    def setUp(self):
1825        # override the definition in the base class
1826        self.address = _get_temp_domain_socket()
1827        DatagramHandlerTest.setUp(self)
1828
1829    def tearDown(self):
1830        DatagramHandlerTest.tearDown(self)
1831        support.unlink(self.address)
1832
1833class SysLogHandlerTest(BaseTest):
1834
1835    """Test for SysLogHandler using UDP."""
1836
1837    server_class = TestUDPServer
1838    address = ('localhost', 0)
1839
1840    def setUp(self):
1841        """Set up a UDP server to receive log messages, and a SysLogHandler
1842        pointing to that server's address and port."""
1843        BaseTest.setUp(self)
1844        # Issue #29177: deal with errors that happen during setup
1845        self.server = self.sl_hdlr = self.server_exception = None
1846        try:
1847            self.server = server = self.server_class(self.address,
1848                                                     self.handle_datagram, 0.01)
1849            server.start()
1850            # Uncomment next line to test error recovery in setUp()
1851            # raise OSError('dummy error raised')
1852        except OSError as e:
1853            self.server_exception = e
1854            return
1855        server.ready.wait()
1856        hcls = logging.handlers.SysLogHandler
1857        if isinstance(server.server_address, tuple):
1858            self.sl_hdlr = hcls((server.server_address[0], server.port))
1859        else:
1860            self.sl_hdlr = hcls(server.server_address)
1861        self.log_output = ''
1862        self.root_logger.removeHandler(self.root_logger.handlers[0])
1863        self.root_logger.addHandler(self.sl_hdlr)
1864        self.handled = threading.Event()
1865
1866    def tearDown(self):
1867        """Shutdown the server."""
1868        try:
1869            if self.server:
1870                self.server.stop(2.0)
1871            if self.sl_hdlr:
1872                self.root_logger.removeHandler(self.sl_hdlr)
1873                self.sl_hdlr.close()
1874        finally:
1875            BaseTest.tearDown(self)
1876
1877    def handle_datagram(self, request):
1878        self.log_output = request.packet
1879        self.handled.set()
1880
1881    def test_output(self):
1882        if self.server_exception:
1883            self.skipTest(self.server_exception)
1884        # The log message sent to the SysLogHandler is properly received.
1885        logger = logging.getLogger("slh")
1886        logger.error("sp\xe4m")
1887        self.handled.wait()
1888        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
1889        self.handled.clear()
1890        self.sl_hdlr.append_nul = False
1891        logger.error("sp\xe4m")
1892        self.handled.wait()
1893        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
1894        self.handled.clear()
1895        self.sl_hdlr.ident = "h\xe4m-"
1896        logger.error("sp\xe4m")
1897        self.handled.wait()
1898        self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
1899
1900@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1901class UnixSysLogHandlerTest(SysLogHandlerTest):
1902
1903    """Test for SysLogHandler with Unix sockets."""
1904
1905    if hasattr(socket, "AF_UNIX"):
1906        server_class = TestUnixDatagramServer
1907
1908    def setUp(self):
1909        # override the definition in the base class
1910        self.address = _get_temp_domain_socket()
1911        SysLogHandlerTest.setUp(self)
1912
1913    def tearDown(self):
1914        SysLogHandlerTest.tearDown(self)
1915        support.unlink(self.address)
1916
1917@unittest.skipUnless(support.IPV6_ENABLED,
1918                     'IPv6 support required for this test.')
1919class IPv6SysLogHandlerTest(SysLogHandlerTest):
1920
1921    """Test for SysLogHandler with IPv6 host."""
1922
1923    server_class = TestUDPServer
1924    address = ('::1', 0)
1925
1926    def setUp(self):
1927        self.server_class.address_family = socket.AF_INET6
1928        super(IPv6SysLogHandlerTest, self).setUp()
1929
1930    def tearDown(self):
1931        self.server_class.address_family = socket.AF_INET
1932        super(IPv6SysLogHandlerTest, self).tearDown()
1933
1934class HTTPHandlerTest(BaseTest):
1935    """Test for HTTPHandler."""
1936
1937    def setUp(self):
1938        """Set up an HTTP server to receive log messages, and a HTTPHandler
1939        pointing to that server's address and port."""
1940        BaseTest.setUp(self)
1941        self.handled = threading.Event()
1942
1943    def handle_request(self, request):
1944        self.command = request.command
1945        self.log_data = urlparse(request.path)
1946        if self.command == 'POST':
1947            try:
1948                rlen = int(request.headers['Content-Length'])
1949                self.post_data = request.rfile.read(rlen)
1950            except:
1951                self.post_data = None
1952        request.send_response(200)
1953        request.end_headers()
1954        self.handled.set()
1955
1956    def test_output(self):
1957        # The log message sent to the HTTPHandler is properly received.
1958        logger = logging.getLogger("http")
1959        root_logger = self.root_logger
1960        root_logger.removeHandler(self.root_logger.handlers[0])
1961        for secure in (False, True):
1962            addr = ('localhost', 0)
1963            if secure:
1964                try:
1965                    import ssl
1966                except ImportError:
1967                    sslctx = None
1968                else:
1969                    here = os.path.dirname(__file__)
1970                    localhost_cert = os.path.join(here, "keycert.pem")
1971                    sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1972                    sslctx.load_cert_chain(localhost_cert)
1973
1974                    context = ssl.create_default_context(cafile=localhost_cert)
1975            else:
1976                sslctx = None
1977                context = None
1978            self.server = server = TestHTTPServer(addr, self.handle_request,
1979                                                    0.01, sslctx=sslctx)
1980            server.start()
1981            server.ready.wait()
1982            host = 'localhost:%d' % server.server_port
1983            secure_client = secure and sslctx
1984            self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
1985                                                       secure=secure_client,
1986                                                       context=context,
1987                                                       credentials=('foo', 'bar'))
1988            self.log_data = None
1989            root_logger.addHandler(self.h_hdlr)
1990
1991            for method in ('GET', 'POST'):
1992                self.h_hdlr.method = method
1993                self.handled.clear()
1994                msg = "sp\xe4m"
1995                logger.error(msg)
1996                self.handled.wait()
1997                self.assertEqual(self.log_data.path, '/frob')
1998                self.assertEqual(self.command, method)
1999                if method == 'GET':
2000                    d = parse_qs(self.log_data.query)
2001                else:
2002                    d = parse_qs(self.post_data.decode('utf-8'))
2003                self.assertEqual(d['name'], ['http'])
2004                self.assertEqual(d['funcName'], ['test_output'])
2005                self.assertEqual(d['msg'], [msg])
2006
2007            self.server.stop(2.0)
2008            self.root_logger.removeHandler(self.h_hdlr)
2009            self.h_hdlr.close()
2010
2011class MemoryTest(BaseTest):
2012
2013    """Test memory persistence of logger objects."""
2014
2015    def setUp(self):
2016        """Create a dict to remember potentially destroyed objects."""
2017        BaseTest.setUp(self)
2018        self._survivors = {}
2019
2020    def _watch_for_survival(self, *args):
2021        """Watch the given objects for survival, by creating weakrefs to
2022        them."""
2023        for obj in args:
2024            key = id(obj), repr(obj)
2025            self._survivors[key] = weakref.ref(obj)
2026
2027    def _assertTruesurvival(self):
2028        """Assert that all objects watched for survival have survived."""
2029        # Trigger cycle breaking.
2030        gc.collect()
2031        dead = []
2032        for (id_, repr_), ref in self._survivors.items():
2033            if ref() is None:
2034                dead.append(repr_)
2035        if dead:
2036            self.fail("%d objects should have survived "
2037                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
2038
2039    def test_persistent_loggers(self):
2040        # Logger objects are persistent and retain their configuration, even
2041        #  if visible references are destroyed.
2042        self.root_logger.setLevel(logging.INFO)
2043        foo = logging.getLogger("foo")
2044        self._watch_for_survival(foo)
2045        foo.setLevel(logging.DEBUG)
2046        self.root_logger.debug(self.next_message())
2047        foo.debug(self.next_message())
2048        self.assert_log_lines([
2049            ('foo', 'DEBUG', '2'),
2050        ])
2051        del foo
2052        # foo has survived.
2053        self._assertTruesurvival()
2054        # foo has retained its settings.
2055        bar = logging.getLogger("foo")
2056        bar.debug(self.next_message())
2057        self.assert_log_lines([
2058            ('foo', 'DEBUG', '2'),
2059            ('foo', 'DEBUG', '3'),
2060        ])
2061
2062
2063class EncodingTest(BaseTest):
2064    def test_encoding_plain_file(self):
2065        # In Python 2.x, a plain file object is treated as having no encoding.
2066        log = logging.getLogger("test")
2067        fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
2068        os.close(fd)
2069        # the non-ascii data we write to the log.
2070        data = "foo\x80"
2071        try:
2072            handler = logging.FileHandler(fn, encoding="utf-8")
2073            log.addHandler(handler)
2074            try:
2075                # write non-ascii data to the log.
2076                log.warning(data)
2077            finally:
2078                log.removeHandler(handler)
2079                handler.close()
2080            # check we wrote exactly those bytes, ignoring trailing \n etc
2081            f = open(fn, encoding="utf-8")
2082            try:
2083                self.assertEqual(f.read().rstrip(), data)
2084            finally:
2085                f.close()
2086        finally:
2087            if os.path.isfile(fn):
2088                os.remove(fn)
2089
2090    def test_encoding_cyrillic_unicode(self):
2091        log = logging.getLogger("test")
2092        # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
2093        message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
2094        # Ensure it's written in a Cyrillic encoding
2095        writer_class = codecs.getwriter('cp1251')
2096        writer_class.encoding = 'cp1251'
2097        stream = io.BytesIO()
2098        writer = writer_class(stream, 'strict')
2099        handler = logging.StreamHandler(writer)
2100        log.addHandler(handler)
2101        try:
2102            log.warning(message)
2103        finally:
2104            log.removeHandler(handler)
2105            handler.close()
2106        # check we wrote exactly those bytes, ignoring trailing \n etc
2107        s = stream.getvalue()
2108        # Compare against what the data should be when encoded in CP-1251
2109        self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
2110
2111
2112class WarningsTest(BaseTest):
2113
2114    def test_warnings(self):
2115        with warnings.catch_warnings():
2116            logging.captureWarnings(True)
2117            self.addCleanup(logging.captureWarnings, False)
2118            warnings.filterwarnings("always", category=UserWarning)
2119            stream = io.StringIO()
2120            h = logging.StreamHandler(stream)
2121            logger = logging.getLogger("py.warnings")
2122            logger.addHandler(h)
2123            warnings.warn("I'm warning you...")
2124            logger.removeHandler(h)
2125            s = stream.getvalue()
2126            h.close()
2127            self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
2128
2129            # See if an explicit file uses the original implementation
2130            a_file = io.StringIO()
2131            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
2132                                 a_file, "Dummy line")
2133            s = a_file.getvalue()
2134            a_file.close()
2135            self.assertEqual(s,
2136                "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
2137
2138    def test_warnings_no_handlers(self):
2139        with warnings.catch_warnings():
2140            logging.captureWarnings(True)
2141            self.addCleanup(logging.captureWarnings, False)
2142
2143            # confirm our assumption: no loggers are set
2144            logger = logging.getLogger("py.warnings")
2145            self.assertEqual(logger.handlers, [])
2146
2147            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
2148            self.assertEqual(len(logger.handlers), 1)
2149            self.assertIsInstance(logger.handlers[0], logging.NullHandler)
2150
2151
2152def formatFunc(format, datefmt=None):
2153    return logging.Formatter(format, datefmt)
2154
2155class myCustomFormatter:
2156    def __init__(self, fmt, datefmt=None):
2157        pass
2158
2159def handlerFunc():
2160    return logging.StreamHandler()
2161
2162class CustomHandler(logging.StreamHandler):
2163    pass
2164
2165class ConfigDictTest(BaseTest):
2166
2167    """Reading logging config from a dictionary."""
2168
2169    check_no_resource_warning = support.check_no_resource_warning
2170    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
2171
2172    # config0 is a standard configuration.
2173    config0 = {
2174        'version': 1,
2175        'formatters': {
2176            'form1' : {
2177                'format' : '%(levelname)s ++ %(message)s',
2178            },
2179        },
2180        'handlers' : {
2181            'hand1' : {
2182                'class' : 'logging.StreamHandler',
2183                'formatter' : 'form1',
2184                'level' : 'NOTSET',
2185                'stream'  : 'ext://sys.stdout',
2186            },
2187        },
2188        'root' : {
2189            'level' : 'WARNING',
2190            'handlers' : ['hand1'],
2191        },
2192    }
2193
2194    # config1 adds a little to the standard configuration.
2195    config1 = {
2196        'version': 1,
2197        'formatters': {
2198            'form1' : {
2199                'format' : '%(levelname)s ++ %(message)s',
2200            },
2201        },
2202        'handlers' : {
2203            'hand1' : {
2204                'class' : 'logging.StreamHandler',
2205                'formatter' : 'form1',
2206                'level' : 'NOTSET',
2207                'stream'  : 'ext://sys.stdout',
2208            },
2209        },
2210        'loggers' : {
2211            'compiler.parser' : {
2212                'level' : 'DEBUG',
2213                'handlers' : ['hand1'],
2214            },
2215        },
2216        'root' : {
2217            'level' : 'WARNING',
2218        },
2219    }
2220
2221    # config1a moves the handler to the root. Used with config8a
2222    config1a = {
2223        'version': 1,
2224        'formatters': {
2225            'form1' : {
2226                'format' : '%(levelname)s ++ %(message)s',
2227            },
2228        },
2229        'handlers' : {
2230            'hand1' : {
2231                'class' : 'logging.StreamHandler',
2232                'formatter' : 'form1',
2233                'level' : 'NOTSET',
2234                'stream'  : 'ext://sys.stdout',
2235            },
2236        },
2237        'loggers' : {
2238            'compiler.parser' : {
2239                'level' : 'DEBUG',
2240            },
2241        },
2242        'root' : {
2243            'level' : 'WARNING',
2244            'handlers' : ['hand1'],
2245        },
2246    }
2247
2248    # config2 has a subtle configuration error that should be reported
2249    config2 = {
2250        'version': 1,
2251        'formatters': {
2252            'form1' : {
2253                'format' : '%(levelname)s ++ %(message)s',
2254            },
2255        },
2256        'handlers' : {
2257            'hand1' : {
2258                'class' : 'logging.StreamHandler',
2259                'formatter' : 'form1',
2260                'level' : 'NOTSET',
2261                'stream'  : 'ext://sys.stdbout',
2262            },
2263        },
2264        'loggers' : {
2265            'compiler.parser' : {
2266                'level' : 'DEBUG',
2267                'handlers' : ['hand1'],
2268            },
2269        },
2270        'root' : {
2271            'level' : 'WARNING',
2272        },
2273    }
2274
2275    # As config1 but with a misspelt level on a handler
2276    config2a = {
2277        'version': 1,
2278        'formatters': {
2279            'form1' : {
2280                'format' : '%(levelname)s ++ %(message)s',
2281            },
2282        },
2283        'handlers' : {
2284            'hand1' : {
2285                'class' : 'logging.StreamHandler',
2286                'formatter' : 'form1',
2287                'level' : 'NTOSET',
2288                'stream'  : 'ext://sys.stdout',
2289            },
2290        },
2291        'loggers' : {
2292            'compiler.parser' : {
2293                'level' : 'DEBUG',
2294                'handlers' : ['hand1'],
2295            },
2296        },
2297        'root' : {
2298            'level' : 'WARNING',
2299        },
2300    }
2301
2302
2303    # As config1 but with a misspelt level on a logger
2304    config2b = {
2305        'version': 1,
2306        'formatters': {
2307            'form1' : {
2308                'format' : '%(levelname)s ++ %(message)s',
2309            },
2310        },
2311        'handlers' : {
2312            'hand1' : {
2313                'class' : 'logging.StreamHandler',
2314                'formatter' : 'form1',
2315                'level' : 'NOTSET',
2316                'stream'  : 'ext://sys.stdout',
2317            },
2318        },
2319        'loggers' : {
2320            'compiler.parser' : {
2321                'level' : 'DEBUG',
2322                'handlers' : ['hand1'],
2323            },
2324        },
2325        'root' : {
2326            'level' : 'WRANING',
2327        },
2328    }
2329
2330    # config3 has a less subtle configuration error
2331    config3 = {
2332        'version': 1,
2333        'formatters': {
2334            'form1' : {
2335                'format' : '%(levelname)s ++ %(message)s',
2336            },
2337        },
2338        'handlers' : {
2339            'hand1' : {
2340                'class' : 'logging.StreamHandler',
2341                'formatter' : 'misspelled_name',
2342                'level' : 'NOTSET',
2343                'stream'  : 'ext://sys.stdout',
2344            },
2345        },
2346        'loggers' : {
2347            'compiler.parser' : {
2348                'level' : 'DEBUG',
2349                'handlers' : ['hand1'],
2350            },
2351        },
2352        'root' : {
2353            'level' : 'WARNING',
2354        },
2355    }
2356
2357    # config4 specifies a custom formatter class to be loaded
2358    config4 = {
2359        'version': 1,
2360        'formatters': {
2361            'form1' : {
2362                '()' : __name__ + '.ExceptionFormatter',
2363                'format' : '%(levelname)s:%(name)s:%(message)s',
2364            },
2365        },
2366        'handlers' : {
2367            'hand1' : {
2368                'class' : 'logging.StreamHandler',
2369                'formatter' : 'form1',
2370                'level' : 'NOTSET',
2371                'stream'  : 'ext://sys.stdout',
2372            },
2373        },
2374        'root' : {
2375            'level' : 'NOTSET',
2376                'handlers' : ['hand1'],
2377        },
2378    }
2379
2380    # As config4 but using an actual callable rather than a string
2381    config4a = {
2382        'version': 1,
2383        'formatters': {
2384            'form1' : {
2385                '()' : ExceptionFormatter,
2386                'format' : '%(levelname)s:%(name)s:%(message)s',
2387            },
2388            'form2' : {
2389                '()' : __name__ + '.formatFunc',
2390                'format' : '%(levelname)s:%(name)s:%(message)s',
2391            },
2392            'form3' : {
2393                '()' : formatFunc,
2394                'format' : '%(levelname)s:%(name)s:%(message)s',
2395            },
2396        },
2397        'handlers' : {
2398            'hand1' : {
2399                'class' : 'logging.StreamHandler',
2400                'formatter' : 'form1',
2401                'level' : 'NOTSET',
2402                'stream'  : 'ext://sys.stdout',
2403            },
2404            'hand2' : {
2405                '()' : handlerFunc,
2406            },
2407        },
2408        'root' : {
2409            'level' : 'NOTSET',
2410                'handlers' : ['hand1'],
2411        },
2412    }
2413
2414    # config5 specifies a custom handler class to be loaded
2415    config5 = {
2416        'version': 1,
2417        'formatters': {
2418            'form1' : {
2419                'format' : '%(levelname)s ++ %(message)s',
2420            },
2421        },
2422        'handlers' : {
2423            'hand1' : {
2424                'class' : __name__ + '.CustomHandler',
2425                'formatter' : 'form1',
2426                'level' : 'NOTSET',
2427                'stream'  : 'ext://sys.stdout',
2428            },
2429        },
2430        'loggers' : {
2431            'compiler.parser' : {
2432                'level' : 'DEBUG',
2433                'handlers' : ['hand1'],
2434            },
2435        },
2436        'root' : {
2437            'level' : 'WARNING',
2438        },
2439    }
2440
2441    # config6 specifies a custom handler class to be loaded
2442    # but has bad arguments
2443    config6 = {
2444        'version': 1,
2445        'formatters': {
2446            'form1' : {
2447                'format' : '%(levelname)s ++ %(message)s',
2448            },
2449        },
2450        'handlers' : {
2451            'hand1' : {
2452                'class' : __name__ + '.CustomHandler',
2453                'formatter' : 'form1',
2454                'level' : 'NOTSET',
2455                'stream'  : 'ext://sys.stdout',
2456                '9' : 'invalid parameter name',
2457            },
2458        },
2459        'loggers' : {
2460            'compiler.parser' : {
2461                'level' : 'DEBUG',
2462                'handlers' : ['hand1'],
2463            },
2464        },
2465        'root' : {
2466            'level' : 'WARNING',
2467        },
2468    }
2469
2470    # config 7 does not define compiler.parser but defines compiler.lexer
2471    # so compiler.parser should be disabled after applying it
2472    config7 = {
2473        'version': 1,
2474        'formatters': {
2475            'form1' : {
2476                'format' : '%(levelname)s ++ %(message)s',
2477            },
2478        },
2479        'handlers' : {
2480            'hand1' : {
2481                'class' : 'logging.StreamHandler',
2482                'formatter' : 'form1',
2483                'level' : 'NOTSET',
2484                'stream'  : 'ext://sys.stdout',
2485            },
2486        },
2487        'loggers' : {
2488            'compiler.lexer' : {
2489                'level' : 'DEBUG',
2490                'handlers' : ['hand1'],
2491            },
2492        },
2493        'root' : {
2494            'level' : 'WARNING',
2495        },
2496    }
2497
2498    # config8 defines both compiler and compiler.lexer
2499    # so compiler.parser should not be disabled (since
2500    # compiler is defined)
2501    config8 = {
2502        'version': 1,
2503        'disable_existing_loggers' : False,
2504        'formatters': {
2505            'form1' : {
2506                'format' : '%(levelname)s ++ %(message)s',
2507            },
2508        },
2509        'handlers' : {
2510            'hand1' : {
2511                'class' : 'logging.StreamHandler',
2512                'formatter' : 'form1',
2513                'level' : 'NOTSET',
2514                'stream'  : 'ext://sys.stdout',
2515            },
2516        },
2517        'loggers' : {
2518            'compiler' : {
2519                'level' : 'DEBUG',
2520                'handlers' : ['hand1'],
2521            },
2522            'compiler.lexer' : {
2523            },
2524        },
2525        'root' : {
2526            'level' : 'WARNING',
2527        },
2528    }
2529
2530    # config8a disables existing loggers
2531    config8a = {
2532        'version': 1,
2533        'disable_existing_loggers' : True,
2534        'formatters': {
2535            'form1' : {
2536                'format' : '%(levelname)s ++ %(message)s',
2537            },
2538        },
2539        'handlers' : {
2540            'hand1' : {
2541                'class' : 'logging.StreamHandler',
2542                'formatter' : 'form1',
2543                'level' : 'NOTSET',
2544                'stream'  : 'ext://sys.stdout',
2545            },
2546        },
2547        'loggers' : {
2548            'compiler' : {
2549                'level' : 'DEBUG',
2550                'handlers' : ['hand1'],
2551            },
2552            'compiler.lexer' : {
2553            },
2554        },
2555        'root' : {
2556            'level' : 'WARNING',
2557        },
2558    }
2559
2560    config9 = {
2561        'version': 1,
2562        'formatters': {
2563            'form1' : {
2564                'format' : '%(levelname)s ++ %(message)s',
2565            },
2566        },
2567        'handlers' : {
2568            'hand1' : {
2569                'class' : 'logging.StreamHandler',
2570                'formatter' : 'form1',
2571                'level' : 'WARNING',
2572                'stream'  : 'ext://sys.stdout',
2573            },
2574        },
2575        'loggers' : {
2576            'compiler.parser' : {
2577                'level' : 'WARNING',
2578                'handlers' : ['hand1'],
2579            },
2580        },
2581        'root' : {
2582            'level' : 'NOTSET',
2583        },
2584    }
2585
2586    config9a = {
2587        'version': 1,
2588        'incremental' : True,
2589        'handlers' : {
2590            'hand1' : {
2591                'level' : 'WARNING',
2592            },
2593        },
2594        'loggers' : {
2595            'compiler.parser' : {
2596                'level' : 'INFO',
2597            },
2598        },
2599    }
2600
2601    config9b = {
2602        'version': 1,
2603        'incremental' : True,
2604        'handlers' : {
2605            'hand1' : {
2606                'level' : 'INFO',
2607            },
2608        },
2609        'loggers' : {
2610            'compiler.parser' : {
2611                'level' : 'INFO',
2612            },
2613        },
2614    }
2615
2616    # As config1 but with a filter added
2617    config10 = {
2618        'version': 1,
2619        'formatters': {
2620            'form1' : {
2621                'format' : '%(levelname)s ++ %(message)s',
2622            },
2623        },
2624        'filters' : {
2625            'filt1' : {
2626                'name' : 'compiler.parser',
2627            },
2628        },
2629        'handlers' : {
2630            'hand1' : {
2631                'class' : 'logging.StreamHandler',
2632                'formatter' : 'form1',
2633                'level' : 'NOTSET',
2634                'stream'  : 'ext://sys.stdout',
2635                'filters' : ['filt1'],
2636            },
2637        },
2638        'loggers' : {
2639            'compiler.parser' : {
2640                'level' : 'DEBUG',
2641                'filters' : ['filt1'],
2642            },
2643        },
2644        'root' : {
2645            'level' : 'WARNING',
2646            'handlers' : ['hand1'],
2647        },
2648    }
2649
2650    # As config1 but using cfg:// references
2651    config11 = {
2652        'version': 1,
2653        'true_formatters': {
2654            'form1' : {
2655                'format' : '%(levelname)s ++ %(message)s',
2656            },
2657        },
2658        'handler_configs': {
2659            'hand1' : {
2660                'class' : 'logging.StreamHandler',
2661                'formatter' : 'form1',
2662                'level' : 'NOTSET',
2663                'stream'  : 'ext://sys.stdout',
2664            },
2665        },
2666        'formatters' : 'cfg://true_formatters',
2667        'handlers' : {
2668            'hand1' : 'cfg://handler_configs[hand1]',
2669        },
2670        'loggers' : {
2671            'compiler.parser' : {
2672                'level' : 'DEBUG',
2673                'handlers' : ['hand1'],
2674            },
2675        },
2676        'root' : {
2677            'level' : 'WARNING',
2678        },
2679    }
2680
2681    # As config11 but missing the version key
2682    config12 = {
2683        'true_formatters': {
2684            'form1' : {
2685                'format' : '%(levelname)s ++ %(message)s',
2686            },
2687        },
2688        'handler_configs': {
2689            'hand1' : {
2690                'class' : 'logging.StreamHandler',
2691                'formatter' : 'form1',
2692                'level' : 'NOTSET',
2693                'stream'  : 'ext://sys.stdout',
2694            },
2695        },
2696        'formatters' : 'cfg://true_formatters',
2697        'handlers' : {
2698            'hand1' : 'cfg://handler_configs[hand1]',
2699        },
2700        'loggers' : {
2701            'compiler.parser' : {
2702                'level' : 'DEBUG',
2703                'handlers' : ['hand1'],
2704            },
2705        },
2706        'root' : {
2707            'level' : 'WARNING',
2708        },
2709    }
2710
2711    # As config11 but using an unsupported version
2712    config13 = {
2713        'version': 2,
2714        'true_formatters': {
2715            'form1' : {
2716                'format' : '%(levelname)s ++ %(message)s',
2717            },
2718        },
2719        'handler_configs': {
2720            'hand1' : {
2721                'class' : 'logging.StreamHandler',
2722                'formatter' : 'form1',
2723                'level' : 'NOTSET',
2724                'stream'  : 'ext://sys.stdout',
2725            },
2726        },
2727        'formatters' : 'cfg://true_formatters',
2728        'handlers' : {
2729            'hand1' : 'cfg://handler_configs[hand1]',
2730        },
2731        'loggers' : {
2732            'compiler.parser' : {
2733                'level' : 'DEBUG',
2734                'handlers' : ['hand1'],
2735            },
2736        },
2737        'root' : {
2738            'level' : 'WARNING',
2739        },
2740    }
2741
2742    # As config0, but with properties
2743    config14 = {
2744        'version': 1,
2745        'formatters': {
2746            'form1' : {
2747                'format' : '%(levelname)s ++ %(message)s',
2748            },
2749        },
2750        'handlers' : {
2751            'hand1' : {
2752                'class' : 'logging.StreamHandler',
2753                'formatter' : 'form1',
2754                'level' : 'NOTSET',
2755                'stream'  : 'ext://sys.stdout',
2756                '.': {
2757                    'foo': 'bar',
2758                    'terminator': '!\n',
2759                }
2760            },
2761        },
2762        'root' : {
2763            'level' : 'WARNING',
2764            'handlers' : ['hand1'],
2765        },
2766    }
2767
2768    out_of_order = {
2769        "version": 1,
2770        "formatters": {
2771            "mySimpleFormatter": {
2772                "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s",
2773                "style": "$"
2774            }
2775        },
2776        "handlers": {
2777            "fileGlobal": {
2778                "class": "logging.StreamHandler",
2779                "level": "DEBUG",
2780                "formatter": "mySimpleFormatter"
2781            },
2782            "bufferGlobal": {
2783                "class": "logging.handlers.MemoryHandler",
2784                "capacity": 5,
2785                "formatter": "mySimpleFormatter",
2786                "target": "fileGlobal",
2787                "level": "DEBUG"
2788                }
2789        },
2790        "loggers": {
2791            "mymodule": {
2792                "level": "DEBUG",
2793                "handlers": ["bufferGlobal"],
2794                "propagate": "true"
2795            }
2796        }
2797    }
2798
2799    # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False
2800    custom_formatter_class_validate = {
2801        'version': 1,
2802        'formatters': {
2803            'form1': {
2804                '()': __name__ + '.ExceptionFormatter',
2805                'format': '%(levelname)s:%(name)s:%(message)s',
2806                'validate': False,
2807            },
2808        },
2809        'handlers' : {
2810            'hand1' : {
2811                'class': 'logging.StreamHandler',
2812                'formatter': 'form1',
2813                'level': 'NOTSET',
2814                'stream': 'ext://sys.stdout',
2815            },
2816        },
2817        "loggers": {
2818            "my_test_logger_custom_formatter": {
2819                "level": "DEBUG",
2820                "handlers": ["hand1"],
2821                "propagate": "true"
2822            }
2823        }
2824    }
2825
2826    # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False
2827    custom_formatter_class_validate2 = {
2828        'version': 1,
2829        'formatters': {
2830            'form1': {
2831                'class': __name__ + '.ExceptionFormatter',
2832                'format': '%(levelname)s:%(name)s:%(message)s',
2833                'validate': False,
2834            },
2835        },
2836        'handlers' : {
2837            'hand1' : {
2838                'class': 'logging.StreamHandler',
2839                'formatter': 'form1',
2840                'level': 'NOTSET',
2841                'stream': 'ext://sys.stdout',
2842            },
2843        },
2844        "loggers": {
2845            "my_test_logger_custom_formatter": {
2846                "level": "DEBUG",
2847                "handlers": ["hand1"],
2848                "propagate": "true"
2849            }
2850        }
2851    }
2852
2853    # Configuration with custom class that is not inherited from logging.Formatter
2854    custom_formatter_class_validate3 = {
2855        'version': 1,
2856        'formatters': {
2857            'form1': {
2858                'class': __name__ + '.myCustomFormatter',
2859                'format': '%(levelname)s:%(name)s:%(message)s',
2860                'validate': False,
2861            },
2862        },
2863        'handlers' : {
2864            'hand1' : {
2865                'class': 'logging.StreamHandler',
2866                'formatter': 'form1',
2867                'level': 'NOTSET',
2868                'stream': 'ext://sys.stdout',
2869            },
2870        },
2871        "loggers": {
2872            "my_test_logger_custom_formatter": {
2873                "level": "DEBUG",
2874                "handlers": ["hand1"],
2875                "propagate": "true"
2876            }
2877        }
2878    }
2879
2880    # Configuration with custom function and 'validate' set to False
2881    custom_formatter_with_function = {
2882        'version': 1,
2883        'formatters': {
2884            'form1': {
2885                '()': formatFunc,
2886                'format': '%(levelname)s:%(name)s:%(message)s',
2887                'validate': False,
2888            },
2889        },
2890        'handlers' : {
2891            'hand1' : {
2892                'class': 'logging.StreamHandler',
2893                'formatter': 'form1',
2894                'level': 'NOTSET',
2895                'stream': 'ext://sys.stdout',
2896            },
2897        },
2898        "loggers": {
2899            "my_test_logger_custom_formatter": {
2900                "level": "DEBUG",
2901                "handlers": ["hand1"],
2902                "propagate": "true"
2903            }
2904        }
2905    }
2906
2907    def apply_config(self, conf):
2908        logging.config.dictConfig(conf)
2909
2910    def test_config0_ok(self):
2911        # A simple config which overrides the default settings.
2912        with support.captured_stdout() as output:
2913            self.apply_config(self.config0)
2914            logger = logging.getLogger()
2915            # Won't output anything
2916            logger.info(self.next_message())
2917            # Outputs a message
2918            logger.error(self.next_message())
2919            self.assert_log_lines([
2920                ('ERROR', '2'),
2921            ], stream=output)
2922            # Original logger output is empty.
2923            self.assert_log_lines([])
2924
2925    def test_config1_ok(self, config=config1):
2926        # A config defining a sub-parser as well.
2927        with support.captured_stdout() as output:
2928            self.apply_config(config)
2929            logger = logging.getLogger("compiler.parser")
2930            # Both will output a message
2931            logger.info(self.next_message())
2932            logger.error(self.next_message())
2933            self.assert_log_lines([
2934                ('INFO', '1'),
2935                ('ERROR', '2'),
2936            ], stream=output)
2937            # Original logger output is empty.
2938            self.assert_log_lines([])
2939
2940    def test_config2_failure(self):
2941        # A simple config which overrides the default settings.
2942        self.assertRaises(Exception, self.apply_config, self.config2)
2943
2944    def test_config2a_failure(self):
2945        # A simple config which overrides the default settings.
2946        self.assertRaises(Exception, self.apply_config, self.config2a)
2947
2948    def test_config2b_failure(self):
2949        # A simple config which overrides the default settings.
2950        self.assertRaises(Exception, self.apply_config, self.config2b)
2951
2952    def test_config3_failure(self):
2953        # A simple config which overrides the default settings.
2954        self.assertRaises(Exception, self.apply_config, self.config3)
2955
2956    def test_config4_ok(self):
2957        # A config specifying a custom formatter class.
2958        with support.captured_stdout() as output:
2959            self.apply_config(self.config4)
2960            #logger = logging.getLogger()
2961            try:
2962                raise RuntimeError()
2963            except RuntimeError:
2964                logging.exception("just testing")
2965            sys.stdout.seek(0)
2966            self.assertEqual(output.getvalue(),
2967                "ERROR:root:just testing\nGot a [RuntimeError]\n")
2968            # Original logger output is empty
2969            self.assert_log_lines([])
2970
2971    def test_config4a_ok(self):
2972        # A config specifying a custom formatter class.
2973        with support.captured_stdout() as output:
2974            self.apply_config(self.config4a)
2975            #logger = logging.getLogger()
2976            try:
2977                raise RuntimeError()
2978            except RuntimeError:
2979                logging.exception("just testing")
2980            sys.stdout.seek(0)
2981            self.assertEqual(output.getvalue(),
2982                "ERROR:root:just testing\nGot a [RuntimeError]\n")
2983            # Original logger output is empty
2984            self.assert_log_lines([])
2985
2986    def test_config5_ok(self):
2987        self.test_config1_ok(config=self.config5)
2988
2989    def test_config6_failure(self):
2990        self.assertRaises(Exception, self.apply_config, self.config6)
2991
2992    def test_config7_ok(self):
2993        with support.captured_stdout() as output:
2994            self.apply_config(self.config1)
2995            logger = logging.getLogger("compiler.parser")
2996            # Both will output a message
2997            logger.info(self.next_message())
2998            logger.error(self.next_message())
2999            self.assert_log_lines([
3000                ('INFO', '1'),
3001                ('ERROR', '2'),
3002            ], stream=output)
3003            # Original logger output is empty.
3004            self.assert_log_lines([])
3005        with support.captured_stdout() as output:
3006            self.apply_config(self.config7)
3007            logger = logging.getLogger("compiler.parser")
3008            self.assertTrue(logger.disabled)
3009            logger = logging.getLogger("compiler.lexer")
3010            # Both will output a message
3011            logger.info(self.next_message())
3012            logger.error(self.next_message())
3013            self.assert_log_lines([
3014                ('INFO', '3'),
3015                ('ERROR', '4'),
3016            ], stream=output)
3017            # Original logger output is empty.
3018            self.assert_log_lines([])
3019
3020    # Same as test_config_7_ok but don't disable old loggers.
3021    def test_config_8_ok(self):
3022        with support.captured_stdout() as output:
3023            self.apply_config(self.config1)
3024            logger = logging.getLogger("compiler.parser")
3025            # All will output a message
3026            logger.info(self.next_message())
3027            logger.error(self.next_message())
3028            self.assert_log_lines([
3029                ('INFO', '1'),
3030                ('ERROR', '2'),
3031            ], stream=output)
3032            # Original logger output is empty.
3033            self.assert_log_lines([])
3034        with support.captured_stdout() as output:
3035            self.apply_config(self.config8)
3036            logger = logging.getLogger("compiler.parser")
3037            self.assertFalse(logger.disabled)
3038            # Both will output a message
3039            logger.info(self.next_message())
3040            logger.error(self.next_message())
3041            logger = logging.getLogger("compiler.lexer")
3042            # Both will output a message
3043            logger.info(self.next_message())
3044            logger.error(self.next_message())
3045            self.assert_log_lines([
3046                ('INFO', '3'),
3047                ('ERROR', '4'),
3048                ('INFO', '5'),
3049                ('ERROR', '6'),
3050            ], stream=output)
3051            # Original logger output is empty.
3052            self.assert_log_lines([])
3053
3054    def test_config_8a_ok(self):
3055        with support.captured_stdout() as output:
3056            self.apply_config(self.config1a)
3057            logger = logging.getLogger("compiler.parser")
3058            # See issue #11424. compiler-hyphenated sorts
3059            # between compiler and compiler.xyz and this
3060            # was preventing compiler.xyz from being included
3061            # in the child loggers of compiler because of an
3062            # overzealous loop termination condition.
3063            hyphenated = logging.getLogger('compiler-hyphenated')
3064            # All will output a message
3065            logger.info(self.next_message())
3066            logger.error(self.next_message())
3067            hyphenated.critical(self.next_message())
3068            self.assert_log_lines([
3069                ('INFO', '1'),
3070                ('ERROR', '2'),
3071                ('CRITICAL', '3'),
3072            ], stream=output)
3073            # Original logger output is empty.
3074            self.assert_log_lines([])
3075        with support.captured_stdout() as output:
3076            self.apply_config(self.config8a)
3077            logger = logging.getLogger("compiler.parser")
3078            self.assertFalse(logger.disabled)
3079            # Both will output a message
3080            logger.info(self.next_message())
3081            logger.error(self.next_message())
3082            logger = logging.getLogger("compiler.lexer")
3083            # Both will output a message
3084            logger.info(self.next_message())
3085            logger.error(self.next_message())
3086            # Will not appear
3087            hyphenated.critical(self.next_message())
3088            self.assert_log_lines([
3089                ('INFO', '4'),
3090                ('ERROR', '5'),
3091                ('INFO', '6'),
3092                ('ERROR', '7'),
3093            ], stream=output)
3094            # Original logger output is empty.
3095            self.assert_log_lines([])
3096
3097    def test_config_9_ok(self):
3098        with support.captured_stdout() as output:
3099            self.apply_config(self.config9)
3100            logger = logging.getLogger("compiler.parser")
3101            # Nothing will be output since both handler and logger are set to WARNING
3102            logger.info(self.next_message())
3103            self.assert_log_lines([], stream=output)
3104            self.apply_config(self.config9a)
3105            # Nothing will be output since handler is still set to WARNING
3106            logger.info(self.next_message())
3107            self.assert_log_lines([], stream=output)
3108            self.apply_config(self.config9b)
3109            # Message should now be output
3110            logger.info(self.next_message())
3111            self.assert_log_lines([
3112                ('INFO', '3'),
3113            ], stream=output)
3114
3115    def test_config_10_ok(self):
3116        with support.captured_stdout() as output:
3117            self.apply_config(self.config10)
3118            logger = logging.getLogger("compiler.parser")
3119            logger.warning(self.next_message())
3120            logger = logging.getLogger('compiler')
3121            # Not output, because filtered
3122            logger.warning(self.next_message())
3123            logger = logging.getLogger('compiler.lexer')
3124            # Not output, because filtered
3125            logger.warning(self.next_message())
3126            logger = logging.getLogger("compiler.parser.codegen")
3127            # Output, as not filtered
3128            logger.error(self.next_message())
3129            self.assert_log_lines([
3130                ('WARNING', '1'),
3131                ('ERROR', '4'),
3132            ], stream=output)
3133
3134    def test_config11_ok(self):
3135        self.test_config1_ok(self.config11)
3136
3137    def test_config12_failure(self):
3138        self.assertRaises(Exception, self.apply_config, self.config12)
3139
3140    def test_config13_failure(self):
3141        self.assertRaises(Exception, self.apply_config, self.config13)
3142
3143    def test_config14_ok(self):
3144        with support.captured_stdout() as output:
3145            self.apply_config(self.config14)
3146            h = logging._handlers['hand1']
3147            self.assertEqual(h.foo, 'bar')
3148            self.assertEqual(h.terminator, '!\n')
3149            logging.warning('Exclamation')
3150            self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
3151
3152    def test_config15_ok(self):
3153
3154        def cleanup(h1, fn):
3155            h1.close()
3156            os.remove(fn)
3157
3158        with self.check_no_resource_warning():
3159            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
3160            os.close(fd)
3161
3162            config = {
3163                "version": 1,
3164                "handlers": {
3165                    "file": {
3166                        "class": "logging.FileHandler",
3167                        "filename": fn
3168                    }
3169                },
3170                "root": {
3171                    "handlers": ["file"]
3172                }
3173            }
3174
3175            self.apply_config(config)
3176            self.apply_config(config)
3177
3178        handler = logging.root.handlers[0]
3179        self.addCleanup(cleanup, handler, fn)
3180
3181    def setup_via_listener(self, text, verify=None):
3182        text = text.encode("utf-8")
3183        # Ask for a randomly assigned port (by using port 0)
3184        t = logging.config.listen(0, verify)
3185        t.start()
3186        t.ready.wait()
3187        # Now get the port allocated
3188        port = t.port
3189        t.ready.clear()
3190        try:
3191            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3192            sock.settimeout(2.0)
3193            sock.connect(('localhost', port))
3194
3195            slen = struct.pack('>L', len(text))
3196            s = slen + text
3197            sentsofar = 0
3198            left = len(s)
3199            while left > 0:
3200                sent = sock.send(s[sentsofar:])
3201                sentsofar += sent
3202                left -= sent
3203            sock.close()
3204        finally:
3205            t.ready.wait(2.0)
3206            logging.config.stopListening()
3207            support.join_thread(t, 2.0)
3208
3209    def test_listen_config_10_ok(self):
3210        with support.captured_stdout() as output:
3211            self.setup_via_listener(json.dumps(self.config10))
3212            logger = logging.getLogger("compiler.parser")
3213            logger.warning(self.next_message())
3214            logger = logging.getLogger('compiler')
3215            # Not output, because filtered
3216            logger.warning(self.next_message())
3217            logger = logging.getLogger('compiler.lexer')
3218            # Not output, because filtered
3219            logger.warning(self.next_message())
3220            logger = logging.getLogger("compiler.parser.codegen")
3221            # Output, as not filtered
3222            logger.error(self.next_message())
3223            self.assert_log_lines([
3224                ('WARNING', '1'),
3225                ('ERROR', '4'),
3226            ], stream=output)
3227
3228    def test_listen_config_1_ok(self):
3229        with support.captured_stdout() as output:
3230            self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
3231            logger = logging.getLogger("compiler.parser")
3232            # Both will output a message
3233            logger.info(self.next_message())
3234            logger.error(self.next_message())
3235            self.assert_log_lines([
3236                ('INFO', '1'),
3237                ('ERROR', '2'),
3238            ], stream=output)
3239            # Original logger output is empty.
3240            self.assert_log_lines([])
3241
3242    def test_listen_verify(self):
3243
3244        def verify_fail(stuff):
3245            return None
3246
3247        def verify_reverse(stuff):
3248            return stuff[::-1]
3249
3250        logger = logging.getLogger("compiler.parser")
3251        to_send = textwrap.dedent(ConfigFileTest.config1)
3252        # First, specify a verification function that will fail.
3253        # We expect to see no output, since our configuration
3254        # never took effect.
3255        with support.captured_stdout() as output:
3256            self.setup_via_listener(to_send, verify_fail)
3257            # Both will output a message
3258            logger.info(self.next_message())
3259            logger.error(self.next_message())
3260        self.assert_log_lines([], stream=output)
3261        # Original logger output has the stuff we logged.
3262        self.assert_log_lines([
3263            ('INFO', '1'),
3264            ('ERROR', '2'),
3265        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3266
3267        # Now, perform no verification. Our configuration
3268        # should take effect.
3269
3270        with support.captured_stdout() as output:
3271            self.setup_via_listener(to_send)    # no verify callable specified
3272            logger = logging.getLogger("compiler.parser")
3273            # Both will output a message
3274            logger.info(self.next_message())
3275            logger.error(self.next_message())
3276        self.assert_log_lines([
3277            ('INFO', '3'),
3278            ('ERROR', '4'),
3279        ], stream=output)
3280        # Original logger output still has the stuff we logged before.
3281        self.assert_log_lines([
3282            ('INFO', '1'),
3283            ('ERROR', '2'),
3284        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3285
3286        # Now, perform verification which transforms the bytes.
3287
3288        with support.captured_stdout() as output:
3289            self.setup_via_listener(to_send[::-1], verify_reverse)
3290            logger = logging.getLogger("compiler.parser")
3291            # Both will output a message
3292            logger.info(self.next_message())
3293            logger.error(self.next_message())
3294        self.assert_log_lines([
3295            ('INFO', '5'),
3296            ('ERROR', '6'),
3297        ], stream=output)
3298        # Original logger output still has the stuff we logged before.
3299        self.assert_log_lines([
3300            ('INFO', '1'),
3301            ('ERROR', '2'),
3302        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3303
3304    def test_out_of_order(self):
3305        self.assertRaises(ValueError, self.apply_config, self.out_of_order)
3306
3307    def test_out_of_order_with_dollar_style(self):
3308        config = copy.deepcopy(self.out_of_order)
3309        config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}"
3310
3311        self.apply_config(config)
3312        handler = logging.getLogger('mymodule').handlers[0]
3313        self.assertIsInstance(handler.target, logging.Handler)
3314        self.assertIsInstance(handler.formatter._style,
3315                              logging.StringTemplateStyle)
3316
3317    def test_custom_formatter_class_with_validate(self):
3318        self.apply_config(self.custom_formatter_class_validate)
3319        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3320        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3321
3322    def test_custom_formatter_class_with_validate2(self):
3323        self.apply_config(self.custom_formatter_class_validate2)
3324        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3325        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3326
3327    def test_custom_formatter_class_with_validate2_with_wrong_fmt(self):
3328        config = self.custom_formatter_class_validate.copy()
3329        config['formatters']['form1']['style'] = "$"
3330
3331        # Exception should not be raise as we have configured 'validate' to False
3332        self.apply_config(config)
3333        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3334        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3335
3336    def test_custom_formatter_class_with_validate3(self):
3337        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3)
3338
3339    def test_custom_formatter_function_with_validate(self):
3340        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function)
3341
3342    def test_baseconfig(self):
3343        d = {
3344            'atuple': (1, 2, 3),
3345            'alist': ['a', 'b', 'c'],
3346            'adict': {'d': 'e', 'f': 3 },
3347            'nest1': ('g', ('h', 'i'), 'j'),
3348            'nest2': ['k', ['l', 'm'], 'n'],
3349            'nest3': ['o', 'cfg://alist', 'p'],
3350        }
3351        bc = logging.config.BaseConfigurator(d)
3352        self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
3353        self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
3354        self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
3355        self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
3356        self.assertEqual(bc.convert('cfg://adict.d'), 'e')
3357        self.assertEqual(bc.convert('cfg://adict[f]'), 3)
3358        v = bc.convert('cfg://nest3')
3359        self.assertEqual(v.pop(1), ['a', 'b', 'c'])
3360        self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
3361        self.assertRaises(ValueError, bc.convert, 'cfg://!')
3362        self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
3363
3364    def test_namedtuple(self):
3365        # see bpo-39142
3366        from collections import namedtuple
3367
3368        class MyHandler(logging.StreamHandler):
3369            def __init__(self, resource, *args, **kwargs):
3370                super().__init__(*args, **kwargs)
3371                self.resource: namedtuple = resource
3372
3373            def emit(self, record):
3374                record.msg += f' {self.resource.type}'
3375                return super().emit(record)
3376
3377        Resource = namedtuple('Resource', ['type', 'labels'])
3378        resource = Resource(type='my_type', labels=['a'])
3379
3380        config = {
3381            'version': 1,
3382            'handlers': {
3383                'myhandler': {
3384                    '()': MyHandler,
3385                    'resource': resource
3386                }
3387            },
3388            'root':  {'level': 'INFO', 'handlers': ['myhandler']},
3389        }
3390        with support.captured_stderr() as stderr:
3391            self.apply_config(config)
3392            logging.info('some log')
3393        self.assertEqual(stderr.getvalue(), 'some log my_type\n')
3394
3395class ManagerTest(BaseTest):
3396    def test_manager_loggerclass(self):
3397        logged = []
3398
3399        class MyLogger(logging.Logger):
3400            def _log(self, level, msg, args, exc_info=None, extra=None):
3401                logged.append(msg)
3402
3403        man = logging.Manager(None)
3404        self.assertRaises(TypeError, man.setLoggerClass, int)
3405        man.setLoggerClass(MyLogger)
3406        logger = man.getLogger('test')
3407        logger.warning('should appear in logged')
3408        logging.warning('should not appear in logged')
3409
3410        self.assertEqual(logged, ['should appear in logged'])
3411
3412    def test_set_log_record_factory(self):
3413        man = logging.Manager(None)
3414        expected = object()
3415        man.setLogRecordFactory(expected)
3416        self.assertEqual(man.logRecordFactory, expected)
3417
3418class ChildLoggerTest(BaseTest):
3419    def test_child_loggers(self):
3420        r = logging.getLogger()
3421        l1 = logging.getLogger('abc')
3422        l2 = logging.getLogger('def.ghi')
3423        c1 = r.getChild('xyz')
3424        c2 = r.getChild('uvw.xyz')
3425        self.assertIs(c1, logging.getLogger('xyz'))
3426        self.assertIs(c2, logging.getLogger('uvw.xyz'))
3427        c1 = l1.getChild('def')
3428        c2 = c1.getChild('ghi')
3429        c3 = l1.getChild('def.ghi')
3430        self.assertIs(c1, logging.getLogger('abc.def'))
3431        self.assertIs(c2, logging.getLogger('abc.def.ghi'))
3432        self.assertIs(c2, c3)
3433
3434
3435class DerivedLogRecord(logging.LogRecord):
3436    pass
3437
3438class LogRecordFactoryTest(BaseTest):
3439
3440    def setUp(self):
3441        class CheckingFilter(logging.Filter):
3442            def __init__(self, cls):
3443                self.cls = cls
3444
3445            def filter(self, record):
3446                t = type(record)
3447                if t is not self.cls:
3448                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
3449                            self.cls)
3450                    raise TypeError(msg)
3451                return True
3452
3453        BaseTest.setUp(self)
3454        self.filter = CheckingFilter(DerivedLogRecord)
3455        self.root_logger.addFilter(self.filter)
3456        self.orig_factory = logging.getLogRecordFactory()
3457
3458    def tearDown(self):
3459        self.root_logger.removeFilter(self.filter)
3460        BaseTest.tearDown(self)
3461        logging.setLogRecordFactory(self.orig_factory)
3462
3463    def test_logrecord_class(self):
3464        self.assertRaises(TypeError, self.root_logger.warning,
3465                          self.next_message())
3466        logging.setLogRecordFactory(DerivedLogRecord)
3467        self.root_logger.error(self.next_message())
3468        self.assert_log_lines([
3469           ('root', 'ERROR', '2'),
3470        ])
3471
3472
3473class QueueHandlerTest(BaseTest):
3474    # Do not bother with a logger name group.
3475    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
3476
3477    def setUp(self):
3478        BaseTest.setUp(self)
3479        self.queue = queue.Queue(-1)
3480        self.que_hdlr = logging.handlers.QueueHandler(self.queue)
3481        self.name = 'que'
3482        self.que_logger = logging.getLogger('que')
3483        self.que_logger.propagate = False
3484        self.que_logger.setLevel(logging.WARNING)
3485        self.que_logger.addHandler(self.que_hdlr)
3486
3487    def tearDown(self):
3488        self.que_hdlr.close()
3489        BaseTest.tearDown(self)
3490
3491    def test_queue_handler(self):
3492        self.que_logger.debug(self.next_message())
3493        self.assertRaises(queue.Empty, self.queue.get_nowait)
3494        self.que_logger.info(self.next_message())
3495        self.assertRaises(queue.Empty, self.queue.get_nowait)
3496        msg = self.next_message()
3497        self.que_logger.warning(msg)
3498        data = self.queue.get_nowait()
3499        self.assertTrue(isinstance(data, logging.LogRecord))
3500        self.assertEqual(data.name, self.que_logger.name)
3501        self.assertEqual((data.msg, data.args), (msg, None))
3502
3503    def test_formatting(self):
3504        msg = self.next_message()
3505        levelname = logging.getLevelName(logging.WARNING)
3506        log_format_str = '{name} -> {levelname}: {message}'
3507        formatted_msg = log_format_str.format(name=self.name,
3508                                              levelname=levelname, message=msg)
3509        formatter = logging.Formatter(self.log_format)
3510        self.que_hdlr.setFormatter(formatter)
3511        self.que_logger.warning(msg)
3512        log_record = self.queue.get_nowait()
3513        self.assertEqual(formatted_msg, log_record.msg)
3514        self.assertEqual(formatted_msg, log_record.message)
3515
3516    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3517                         'logging.handlers.QueueListener required for this test')
3518    def test_queue_listener(self):
3519        handler = support.TestHandler(support.Matcher())
3520        listener = logging.handlers.QueueListener(self.queue, handler)
3521        listener.start()
3522        try:
3523            self.que_logger.warning(self.next_message())
3524            self.que_logger.error(self.next_message())
3525            self.que_logger.critical(self.next_message())
3526        finally:
3527            listener.stop()
3528        self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
3529        self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
3530        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
3531        handler.close()
3532
3533        # Now test with respect_handler_level set
3534
3535        handler = support.TestHandler(support.Matcher())
3536        handler.setLevel(logging.CRITICAL)
3537        listener = logging.handlers.QueueListener(self.queue, handler,
3538                                                  respect_handler_level=True)
3539        listener.start()
3540        try:
3541            self.que_logger.warning(self.next_message())
3542            self.que_logger.error(self.next_message())
3543            self.que_logger.critical(self.next_message())
3544        finally:
3545            listener.stop()
3546        self.assertFalse(handler.matches(levelno=logging.WARNING, message='4'))
3547        self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
3548        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
3549        handler.close()
3550
3551    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3552                         'logging.handlers.QueueListener required for this test')
3553    def test_queue_listener_with_StreamHandler(self):
3554        # Test that traceback only appends once (bpo-34334).
3555        listener = logging.handlers.QueueListener(self.queue, self.root_hdlr)
3556        listener.start()
3557        try:
3558            1 / 0
3559        except ZeroDivisionError as e:
3560            exc = e
3561            self.que_logger.exception(self.next_message(), exc_info=exc)
3562        listener.stop()
3563        self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1)
3564
3565    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3566                         'logging.handlers.QueueListener required for this test')
3567    def test_queue_listener_with_multiple_handlers(self):
3568        # Test that queue handler format doesn't affect other handler formats (bpo-35726).
3569        self.que_hdlr.setFormatter(self.root_formatter)
3570        self.que_logger.addHandler(self.root_hdlr)
3571
3572        listener = logging.handlers.QueueListener(self.queue, self.que_hdlr)
3573        listener.start()
3574        self.que_logger.error("error")
3575        listener.stop()
3576        self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error")
3577
3578if hasattr(logging.handlers, 'QueueListener'):
3579    import multiprocessing
3580    from unittest.mock import patch
3581
3582    class QueueListenerTest(BaseTest):
3583        """
3584        Tests based on patch submitted for issue #27930. Ensure that
3585        QueueListener handles all log messages.
3586        """
3587
3588        repeat = 20
3589
3590        @staticmethod
3591        def setup_and_log(log_queue, ident):
3592            """
3593            Creates a logger with a QueueHandler that logs to a queue read by a
3594            QueueListener. Starts the listener, logs five messages, and stops
3595            the listener.
3596            """
3597            logger = logging.getLogger('test_logger_with_id_%s' % ident)
3598            logger.setLevel(logging.DEBUG)
3599            handler = logging.handlers.QueueHandler(log_queue)
3600            logger.addHandler(handler)
3601            listener = logging.handlers.QueueListener(log_queue)
3602            listener.start()
3603
3604            logger.info('one')
3605            logger.info('two')
3606            logger.info('three')
3607            logger.info('four')
3608            logger.info('five')
3609
3610            listener.stop()
3611            logger.removeHandler(handler)
3612            handler.close()
3613
3614        @patch.object(logging.handlers.QueueListener, 'handle')
3615        def test_handle_called_with_queue_queue(self, mock_handle):
3616            for i in range(self.repeat):
3617                log_queue = queue.Queue()
3618                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3619            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3620                             'correct number of handled log messages')
3621
3622        @patch.object(logging.handlers.QueueListener, 'handle')
3623        def test_handle_called_with_mp_queue(self, mock_handle):
3624            # bpo-28668: The multiprocessing (mp) module is not functional
3625            # when the mp.synchronize module cannot be imported.
3626            support.skip_if_broken_multiprocessing_synchronize()
3627            for i in range(self.repeat):
3628                log_queue = multiprocessing.Queue()
3629                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3630                log_queue.close()
3631                log_queue.join_thread()
3632            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3633                             'correct number of handled log messages')
3634
3635        @staticmethod
3636        def get_all_from_queue(log_queue):
3637            try:
3638                while True:
3639                    yield log_queue.get_nowait()
3640            except queue.Empty:
3641                return []
3642
3643        def test_no_messages_in_queue_after_stop(self):
3644            """
3645            Five messages are logged then the QueueListener is stopped. This
3646            test then gets everything off the queue. Failure of this test
3647            indicates that messages were not registered on the queue until
3648            _after_ the QueueListener stopped.
3649            """
3650            # bpo-28668: The multiprocessing (mp) module is not functional
3651            # when the mp.synchronize module cannot be imported.
3652            support.skip_if_broken_multiprocessing_synchronize()
3653            for i in range(self.repeat):
3654                queue = multiprocessing.Queue()
3655                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
3656                # time.sleep(1)
3657                items = list(self.get_all_from_queue(queue))
3658                queue.close()
3659                queue.join_thread()
3660
3661                expected = [[], [logging.handlers.QueueListener._sentinel]]
3662                self.assertIn(items, expected,
3663                              'Found unexpected messages in queue: %s' % (
3664                                    [m.msg if isinstance(m, logging.LogRecord)
3665                                     else m for m in items]))
3666
3667        def test_calls_task_done_after_stop(self):
3668            # Issue 36813: Make sure queue.join does not deadlock.
3669            log_queue = queue.Queue()
3670            listener = logging.handlers.QueueListener(log_queue)
3671            listener.start()
3672            listener.stop()
3673            with self.assertRaises(ValueError):
3674                # Make sure all tasks are done and .join won't block.
3675                log_queue.task_done()
3676
3677
3678ZERO = datetime.timedelta(0)
3679
3680class UTC(datetime.tzinfo):
3681    def utcoffset(self, dt):
3682        return ZERO
3683
3684    dst = utcoffset
3685
3686    def tzname(self, dt):
3687        return 'UTC'
3688
3689utc = UTC()
3690
3691class FormatterTest(unittest.TestCase):
3692    def setUp(self):
3693        self.common = {
3694            'name': 'formatter.test',
3695            'level': logging.DEBUG,
3696            'pathname': os.path.join('path', 'to', 'dummy.ext'),
3697            'lineno': 42,
3698            'exc_info': None,
3699            'func': None,
3700            'msg': 'Message with %d %s',
3701            'args': (2, 'placeholders'),
3702        }
3703        self.variants = {
3704        }
3705
3706    def get_record(self, name=None):
3707        result = dict(self.common)
3708        if name is not None:
3709            result.update(self.variants[name])
3710        return logging.makeLogRecord(result)
3711
3712    def assert_error_message(self, exception, message, *args, **kwargs):
3713        try:
3714            self.assertRaises(exception, *args, **kwargs)
3715        except exception as e:
3716            self.assertEqual(message, e.message)
3717
3718    def test_percent(self):
3719        # Test %-formatting
3720        r = self.get_record()
3721        f = logging.Formatter('${%(message)s}')
3722        self.assertEqual(f.format(r), '${Message with 2 placeholders}')
3723        f = logging.Formatter('%(random)s')
3724        self.assertRaises(ValueError, f.format, r)
3725        self.assertFalse(f.usesTime())
3726        f = logging.Formatter('%(asctime)s')
3727        self.assertTrue(f.usesTime())
3728        f = logging.Formatter('%(asctime)-15s')
3729        self.assertTrue(f.usesTime())
3730        f = logging.Formatter('%(asctime)#15s')
3731        self.assertTrue(f.usesTime())
3732
3733    def test_braces(self):
3734        # Test {}-formatting
3735        r = self.get_record()
3736        f = logging.Formatter('$%{message}%$', style='{')
3737        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3738        f = logging.Formatter('{random}', style='{')
3739        self.assertRaises(ValueError, f.format, r)
3740        f = logging.Formatter("{message}", style='{')
3741        self.assertFalse(f.usesTime())
3742        f = logging.Formatter('{asctime}', style='{')
3743        self.assertTrue(f.usesTime())
3744        f = logging.Formatter('{asctime!s:15}', style='{')
3745        self.assertTrue(f.usesTime())
3746        f = logging.Formatter('{asctime:15}', style='{')
3747        self.assertTrue(f.usesTime())
3748
3749    def test_dollars(self):
3750        # Test $-formatting
3751        r = self.get_record()
3752        f = logging.Formatter('${message}', style='$')
3753        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3754        f = logging.Formatter('$message', style='$')
3755        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3756        f = logging.Formatter('$$%${message}%$$', style='$')
3757        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3758        f = logging.Formatter('${random}', style='$')
3759        self.assertRaises(ValueError, f.format, r)
3760        self.assertFalse(f.usesTime())
3761        f = logging.Formatter('${asctime}', style='$')
3762        self.assertTrue(f.usesTime())
3763        f = logging.Formatter('$asctime', style='$')
3764        self.assertTrue(f.usesTime())
3765        f = logging.Formatter('${message}', style='$')
3766        self.assertFalse(f.usesTime())
3767        f = logging.Formatter('${asctime}--', style='$')
3768        self.assertTrue(f.usesTime())
3769
3770    def test_format_validate(self):
3771        # Check correct formatting
3772        # Percentage style
3773        f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3774        self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3775        f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3776        self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3777        f = logging.Formatter("%(process)#+027.23X")
3778        self.assertEqual(f._fmt, "%(process)#+027.23X")
3779        f = logging.Formatter("%(foo)#.*g")
3780        self.assertEqual(f._fmt, "%(foo)#.*g")
3781
3782        # StrFormat Style
3783        f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{")
3784        self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}")
3785        f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{")
3786        self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}")
3787        f = logging.Formatter("{customfield!s:#<30}", style="{")
3788        self.assertEqual(f._fmt, "{customfield!s:#<30}")
3789        f = logging.Formatter("{message!r}", style="{")
3790        self.assertEqual(f._fmt, "{message!r}")
3791        f = logging.Formatter("{message!s}", style="{")
3792        self.assertEqual(f._fmt, "{message!s}")
3793        f = logging.Formatter("{message!a}", style="{")
3794        self.assertEqual(f._fmt, "{message!a}")
3795        f = logging.Formatter("{process!r:4.2}", style="{")
3796        self.assertEqual(f._fmt, "{process!r:4.2}")
3797        f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{")
3798        self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}")
3799        f = logging.Formatter("{process!s:{w},.{p}}", style="{")
3800        self.assertEqual(f._fmt, "{process!s:{w},.{p}}")
3801        f = logging.Formatter("{foo:12.{p}}", style="{")
3802        self.assertEqual(f._fmt, "{foo:12.{p}}")
3803        f = logging.Formatter("{foo:{w}.6}", style="{")
3804        self.assertEqual(f._fmt, "{foo:{w}.6}")
3805        f = logging.Formatter("{foo[0].bar[1].baz}", style="{")
3806        self.assertEqual(f._fmt, "{foo[0].bar[1].baz}")
3807        f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{")
3808        self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}")
3809        f = logging.Formatter("{12[k1].bar[k2].baz}", style="{")
3810        self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}")
3811
3812        # Dollar style
3813        f = logging.Formatter("${asctime} - $message", style="$")
3814        self.assertEqual(f._fmt, "${asctime} - $message")
3815        f = logging.Formatter("$bar $$", style="$")
3816        self.assertEqual(f._fmt, "$bar $$")
3817        f = logging.Formatter("$bar $$$$", style="$")
3818        self.assertEqual(f._fmt, "$bar $$$$")  # this would print two $($$)
3819
3820        # Testing when ValueError being raised from incorrect format
3821        # Percentage Style
3822        self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z")
3823        self.assertRaises(ValueError, logging.Formatter, "%(asctime)b")
3824        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*")
3825        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s")
3826        self.assertRaises(ValueError, logging.Formatter, "%(asctime)_")
3827        self.assertRaises(ValueError, logging.Formatter, '{asctime}')
3828        self.assertRaises(ValueError, logging.Formatter, '${message}')
3829        self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f')  # with both * and decimal number as precision
3830        self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f')
3831
3832        # StrFormat Style
3833        # Testing failure for '-' in field name
3834        self.assert_error_message(
3835            ValueError,
3836            "invalid field name/expression: 'name-thing'",
3837            logging.Formatter, "{name-thing}", style="{"
3838        )
3839        # Testing failure for style mismatch
3840        self.assert_error_message(
3841            ValueError,
3842            "invalid format: no fields",
3843            logging.Formatter, '%(asctime)s', style='{'
3844        )
3845        # Testing failure for invalid conversion
3846        self.assert_error_message(
3847            ValueError,
3848            "invalid conversion: 'Z'"
3849        )
3850        self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{')
3851        self.assert_error_message(
3852            ValueError,
3853            "invalid format: expected ':' after conversion specifier",
3854            logging.Formatter, '{asctime!aa:15}', style='{'
3855        )
3856        # Testing failure for invalid spec
3857        self.assert_error_message(
3858            ValueError,
3859            "bad specifier: '.2ff'",
3860            logging.Formatter, '{process:.2ff}', style='{'
3861        )
3862        self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{')
3863        self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{')
3864        self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{')
3865        self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{')
3866        # Testing failure for mismatch braces
3867        self.assert_error_message(
3868            ValueError,
3869            "invalid format: unmatched '{' in format spec",
3870            logging.Formatter, '{process', style='{'
3871        )
3872        self.assert_error_message(
3873            ValueError,
3874            "invalid format: unmatched '{' in format spec",
3875            logging.Formatter, 'process}', style='{'
3876        )
3877        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{')
3878        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{')
3879        self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{')
3880        self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{')
3881        self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{')
3882        self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{')
3883        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{')
3884        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{')
3885        self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{')
3886
3887        # Dollar style
3888        # Testing failure for mismatch bare $
3889        self.assert_error_message(
3890            ValueError,
3891            "invalid format: bare \'$\' not allowed",
3892            logging.Formatter, '$bar $$$', style='$'
3893        )
3894        self.assert_error_message(
3895            ValueError,
3896            "invalid format: bare \'$\' not allowed",
3897            logging.Formatter, 'bar $', style='$'
3898        )
3899        self.assert_error_message(
3900            ValueError,
3901            "invalid format: bare \'$\' not allowed",
3902            logging.Formatter, 'foo $.', style='$'
3903        )
3904        # Testing failure for mismatch style
3905        self.assert_error_message(
3906            ValueError,
3907            "invalid format: no fields",
3908            logging.Formatter, '{asctime}', style='$'
3909        )
3910        self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$')
3911
3912        # Testing failure for incorrect fields
3913        self.assert_error_message(
3914            ValueError,
3915            "invalid format: no fields",
3916            logging.Formatter, 'foo', style='$'
3917        )
3918        self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$')
3919
3920    def test_invalid_style(self):
3921        self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
3922
3923    def test_time(self):
3924        r = self.get_record()
3925        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
3926        # We use None to indicate we want the local timezone
3927        # We're essentially converting a UTC time to local time
3928        r.created = time.mktime(dt.astimezone(None).timetuple())
3929        r.msecs = 123
3930        f = logging.Formatter('%(asctime)s %(message)s')
3931        f.converter = time.gmtime
3932        self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
3933        self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
3934        f.format(r)
3935        self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
3936
3937class TestBufferingFormatter(logging.BufferingFormatter):
3938    def formatHeader(self, records):
3939        return '[(%d)' % len(records)
3940
3941    def formatFooter(self, records):
3942        return '(%d)]' % len(records)
3943
3944class BufferingFormatterTest(unittest.TestCase):
3945    def setUp(self):
3946        self.records = [
3947            logging.makeLogRecord({'msg': 'one'}),
3948            logging.makeLogRecord({'msg': 'two'}),
3949        ]
3950
3951    def test_default(self):
3952        f = logging.BufferingFormatter()
3953        self.assertEqual('', f.format([]))
3954        self.assertEqual('onetwo', f.format(self.records))
3955
3956    def test_custom(self):
3957        f = TestBufferingFormatter()
3958        self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
3959        lf = logging.Formatter('<%(message)s>')
3960        f = TestBufferingFormatter(lf)
3961        self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
3962
3963class ExceptionTest(BaseTest):
3964    def test_formatting(self):
3965        r = self.root_logger
3966        h = RecordingHandler()
3967        r.addHandler(h)
3968        try:
3969            raise RuntimeError('deliberate mistake')
3970        except:
3971            logging.exception('failed', stack_info=True)
3972        r.removeHandler(h)
3973        h.close()
3974        r = h.records[0]
3975        self.assertTrue(r.exc_text.startswith('Traceback (most recent '
3976                                              'call last):\n'))
3977        self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
3978                                            'deliberate mistake'))
3979        self.assertTrue(r.stack_info.startswith('Stack (most recent '
3980                                              'call last):\n'))
3981        self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
3982                                            'stack_info=True)'))
3983
3984
3985class LastResortTest(BaseTest):
3986    def test_last_resort(self):
3987        # Test the last resort handler
3988        root = self.root_logger
3989        root.removeHandler(self.root_hdlr)
3990        old_lastresort = logging.lastResort
3991        old_raise_exceptions = logging.raiseExceptions
3992
3993        try:
3994            with support.captured_stderr() as stderr:
3995                root.debug('This should not appear')
3996                self.assertEqual(stderr.getvalue(), '')
3997                root.warning('Final chance!')
3998                self.assertEqual(stderr.getvalue(), 'Final chance!\n')
3999
4000            # No handlers and no last resort, so 'No handlers' message
4001            logging.lastResort = None
4002            with support.captured_stderr() as stderr:
4003                root.warning('Final chance!')
4004                msg = 'No handlers could be found for logger "root"\n'
4005                self.assertEqual(stderr.getvalue(), msg)
4006
4007            # 'No handlers' message only printed once
4008            with support.captured_stderr() as stderr:
4009                root.warning('Final chance!')
4010                self.assertEqual(stderr.getvalue(), '')
4011
4012            # If raiseExceptions is False, no message is printed
4013            root.manager.emittedNoHandlerWarning = False
4014            logging.raiseExceptions = False
4015            with support.captured_stderr() as stderr:
4016                root.warning('Final chance!')
4017                self.assertEqual(stderr.getvalue(), '')
4018        finally:
4019            root.addHandler(self.root_hdlr)
4020            logging.lastResort = old_lastresort
4021            logging.raiseExceptions = old_raise_exceptions
4022
4023
4024class FakeHandler:
4025
4026    def __init__(self, identifier, called):
4027        for method in ('acquire', 'flush', 'close', 'release'):
4028            setattr(self, method, self.record_call(identifier, method, called))
4029
4030    def record_call(self, identifier, method_name, called):
4031        def inner():
4032            called.append('{} - {}'.format(identifier, method_name))
4033        return inner
4034
4035
4036class RecordingHandler(logging.NullHandler):
4037
4038    def __init__(self, *args, **kwargs):
4039        super(RecordingHandler, self).__init__(*args, **kwargs)
4040        self.records = []
4041
4042    def handle(self, record):
4043        """Keep track of all the emitted records."""
4044        self.records.append(record)
4045
4046
4047class ShutdownTest(BaseTest):
4048
4049    """Test suite for the shutdown method."""
4050
4051    def setUp(self):
4052        super(ShutdownTest, self).setUp()
4053        self.called = []
4054
4055        raise_exceptions = logging.raiseExceptions
4056        self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
4057
4058    def raise_error(self, error):
4059        def inner():
4060            raise error()
4061        return inner
4062
4063    def test_no_failure(self):
4064        # create some fake handlers
4065        handler0 = FakeHandler(0, self.called)
4066        handler1 = FakeHandler(1, self.called)
4067        handler2 = FakeHandler(2, self.called)
4068
4069        # create live weakref to those handlers
4070        handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
4071
4072        logging.shutdown(handlerList=list(handlers))
4073
4074        expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
4075                    '1 - acquire', '1 - flush', '1 - close', '1 - release',
4076                    '0 - acquire', '0 - flush', '0 - close', '0 - release']
4077        self.assertEqual(expected, self.called)
4078
4079    def _test_with_failure_in_method(self, method, error):
4080        handler = FakeHandler(0, self.called)
4081        setattr(handler, method, self.raise_error(error))
4082        handlers = [logging.weakref.ref(handler)]
4083
4084        logging.shutdown(handlerList=list(handlers))
4085
4086        self.assertEqual('0 - release', self.called[-1])
4087
4088    def test_with_ioerror_in_acquire(self):
4089        self._test_with_failure_in_method('acquire', OSError)
4090
4091    def test_with_ioerror_in_flush(self):
4092        self._test_with_failure_in_method('flush', OSError)
4093
4094    def test_with_ioerror_in_close(self):
4095        self._test_with_failure_in_method('close', OSError)
4096
4097    def test_with_valueerror_in_acquire(self):
4098        self._test_with_failure_in_method('acquire', ValueError)
4099
4100    def test_with_valueerror_in_flush(self):
4101        self._test_with_failure_in_method('flush', ValueError)
4102
4103    def test_with_valueerror_in_close(self):
4104        self._test_with_failure_in_method('close', ValueError)
4105
4106    def test_with_other_error_in_acquire_without_raise(self):
4107        logging.raiseExceptions = False
4108        self._test_with_failure_in_method('acquire', IndexError)
4109
4110    def test_with_other_error_in_flush_without_raise(self):
4111        logging.raiseExceptions = False
4112        self._test_with_failure_in_method('flush', IndexError)
4113
4114    def test_with_other_error_in_close_without_raise(self):
4115        logging.raiseExceptions = False
4116        self._test_with_failure_in_method('close', IndexError)
4117
4118    def test_with_other_error_in_acquire_with_raise(self):
4119        logging.raiseExceptions = True
4120        self.assertRaises(IndexError, self._test_with_failure_in_method,
4121                          'acquire', IndexError)
4122
4123    def test_with_other_error_in_flush_with_raise(self):
4124        logging.raiseExceptions = True
4125        self.assertRaises(IndexError, self._test_with_failure_in_method,
4126                          'flush', IndexError)
4127
4128    def test_with_other_error_in_close_with_raise(self):
4129        logging.raiseExceptions = True
4130        self.assertRaises(IndexError, self._test_with_failure_in_method,
4131                          'close', IndexError)
4132
4133
4134class ModuleLevelMiscTest(BaseTest):
4135
4136    """Test suite for some module level methods."""
4137
4138    def test_disable(self):
4139        old_disable = logging.root.manager.disable
4140        # confirm our assumptions are correct
4141        self.assertEqual(old_disable, 0)
4142        self.addCleanup(logging.disable, old_disable)
4143
4144        logging.disable(83)
4145        self.assertEqual(logging.root.manager.disable, 83)
4146
4147        # test the default value introduced in 3.7
4148        # (Issue #28524)
4149        logging.disable()
4150        self.assertEqual(logging.root.manager.disable, logging.CRITICAL)
4151
4152    def _test_log(self, method, level=None):
4153        called = []
4154        support.patch(self, logging, 'basicConfig',
4155                      lambda *a, **kw: called.append((a, kw)))
4156
4157        recording = RecordingHandler()
4158        logging.root.addHandler(recording)
4159
4160        log_method = getattr(logging, method)
4161        if level is not None:
4162            log_method(level, "test me: %r", recording)
4163        else:
4164            log_method("test me: %r", recording)
4165
4166        self.assertEqual(len(recording.records), 1)
4167        record = recording.records[0]
4168        self.assertEqual(record.getMessage(), "test me: %r" % recording)
4169
4170        expected_level = level if level is not None else getattr(logging, method.upper())
4171        self.assertEqual(record.levelno, expected_level)
4172
4173        # basicConfig was not called!
4174        self.assertEqual(called, [])
4175
4176    def test_log(self):
4177        self._test_log('log', logging.ERROR)
4178
4179    def test_debug(self):
4180        self._test_log('debug')
4181
4182    def test_info(self):
4183        self._test_log('info')
4184
4185    def test_warning(self):
4186        self._test_log('warning')
4187
4188    def test_error(self):
4189        self._test_log('error')
4190
4191    def test_critical(self):
4192        self._test_log('critical')
4193
4194    def test_set_logger_class(self):
4195        self.assertRaises(TypeError, logging.setLoggerClass, object)
4196
4197        class MyLogger(logging.Logger):
4198            pass
4199
4200        logging.setLoggerClass(MyLogger)
4201        self.assertEqual(logging.getLoggerClass(), MyLogger)
4202
4203        logging.setLoggerClass(logging.Logger)
4204        self.assertEqual(logging.getLoggerClass(), logging.Logger)
4205
4206    def test_subclass_logger_cache(self):
4207        # bpo-37258
4208        message = []
4209
4210        class MyLogger(logging.getLoggerClass()):
4211            def __init__(self, name='MyLogger', level=logging.NOTSET):
4212                super().__init__(name, level)
4213                message.append('initialized')
4214
4215        logging.setLoggerClass(MyLogger)
4216        logger = logging.getLogger('just_some_logger')
4217        self.assertEqual(message, ['initialized'])
4218        stream = io.StringIO()
4219        h = logging.StreamHandler(stream)
4220        logger.addHandler(h)
4221        try:
4222            logger.setLevel(logging.DEBUG)
4223            logger.debug("hello")
4224            self.assertEqual(stream.getvalue().strip(), "hello")
4225
4226            stream.truncate(0)
4227            stream.seek(0)
4228
4229            logger.setLevel(logging.INFO)
4230            logger.debug("hello")
4231            self.assertEqual(stream.getvalue(), "")
4232        finally:
4233            logger.removeHandler(h)
4234            h.close()
4235            logging.setLoggerClass(logging.Logger)
4236
4237    @support.requires_type_collecting
4238    def test_logging_at_shutdown(self):
4239        # Issue #20037
4240        code = """if 1:
4241            import logging
4242
4243            class A:
4244                def __del__(self):
4245                    try:
4246                        raise ValueError("some error")
4247                    except Exception:
4248                        logging.exception("exception in __del__")
4249
4250            a = A()"""
4251        rc, out, err = assert_python_ok("-c", code)
4252        err = err.decode()
4253        self.assertIn("exception in __del__", err)
4254        self.assertIn("ValueError: some error", err)
4255
4256    def test_recursion_error(self):
4257        # Issue 36272
4258        code = """if 1:
4259            import logging
4260
4261            def rec():
4262                logging.error("foo")
4263                rec()
4264
4265            rec()"""
4266        rc, out, err = assert_python_failure("-c", code)
4267        err = err.decode()
4268        self.assertNotIn("Cannot recover from stack overflow.", err)
4269        self.assertEqual(rc, 1)
4270
4271
4272class LogRecordTest(BaseTest):
4273    def test_str_rep(self):
4274        r = logging.makeLogRecord({})
4275        s = str(r)
4276        self.assertTrue(s.startswith('<LogRecord: '))
4277        self.assertTrue(s.endswith('>'))
4278
4279    def test_dict_arg(self):
4280        h = RecordingHandler()
4281        r = logging.getLogger()
4282        r.addHandler(h)
4283        d = {'less' : 'more' }
4284        logging.warning('less is %(less)s', d)
4285        self.assertIs(h.records[0].args, d)
4286        self.assertEqual(h.records[0].message, 'less is more')
4287        r.removeHandler(h)
4288        h.close()
4289
4290    def test_multiprocessing(self):
4291        r = logging.makeLogRecord({})
4292        self.assertEqual(r.processName, 'MainProcess')
4293        try:
4294            import multiprocessing as mp
4295            r = logging.makeLogRecord({})
4296            self.assertEqual(r.processName, mp.current_process().name)
4297        except ImportError:
4298            pass
4299
4300    def test_optional(self):
4301        r = logging.makeLogRecord({})
4302        NOT_NONE = self.assertIsNotNone
4303        NOT_NONE(r.thread)
4304        NOT_NONE(r.threadName)
4305        NOT_NONE(r.process)
4306        NOT_NONE(r.processName)
4307        log_threads = logging.logThreads
4308        log_processes = logging.logProcesses
4309        log_multiprocessing = logging.logMultiprocessing
4310        try:
4311            logging.logThreads = False
4312            logging.logProcesses = False
4313            logging.logMultiprocessing = False
4314            r = logging.makeLogRecord({})
4315            NONE = self.assertIsNone
4316            NONE(r.thread)
4317            NONE(r.threadName)
4318            NONE(r.process)
4319            NONE(r.processName)
4320        finally:
4321            logging.logThreads = log_threads
4322            logging.logProcesses = log_processes
4323            logging.logMultiprocessing = log_multiprocessing
4324
4325class BasicConfigTest(unittest.TestCase):
4326
4327    """Test suite for logging.basicConfig."""
4328
4329    def setUp(self):
4330        super(BasicConfigTest, self).setUp()
4331        self.handlers = logging.root.handlers
4332        self.saved_handlers = logging._handlers.copy()
4333        self.saved_handler_list = logging._handlerList[:]
4334        self.original_logging_level = logging.root.level
4335        self.addCleanup(self.cleanup)
4336        logging.root.handlers = []
4337
4338    def tearDown(self):
4339        for h in logging.root.handlers[:]:
4340            logging.root.removeHandler(h)
4341            h.close()
4342        super(BasicConfigTest, self).tearDown()
4343
4344    def cleanup(self):
4345        setattr(logging.root, 'handlers', self.handlers)
4346        logging._handlers.clear()
4347        logging._handlers.update(self.saved_handlers)
4348        logging._handlerList[:] = self.saved_handler_list
4349        logging.root.setLevel(self.original_logging_level)
4350
4351    def test_no_kwargs(self):
4352        logging.basicConfig()
4353
4354        # handler defaults to a StreamHandler to sys.stderr
4355        self.assertEqual(len(logging.root.handlers), 1)
4356        handler = logging.root.handlers[0]
4357        self.assertIsInstance(handler, logging.StreamHandler)
4358        self.assertEqual(handler.stream, sys.stderr)
4359
4360        formatter = handler.formatter
4361        # format defaults to logging.BASIC_FORMAT
4362        self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
4363        # datefmt defaults to None
4364        self.assertIsNone(formatter.datefmt)
4365        # style defaults to %
4366        self.assertIsInstance(formatter._style, logging.PercentStyle)
4367
4368        # level is not explicitly set
4369        self.assertEqual(logging.root.level, self.original_logging_level)
4370
4371    def test_strformatstyle(self):
4372        with support.captured_stdout() as output:
4373            logging.basicConfig(stream=sys.stdout, style="{")
4374            logging.error("Log an error")
4375            sys.stdout.seek(0)
4376            self.assertEqual(output.getvalue().strip(),
4377                "ERROR:root:Log an error")
4378
4379    def test_stringtemplatestyle(self):
4380        with support.captured_stdout() as output:
4381            logging.basicConfig(stream=sys.stdout, style="$")
4382            logging.error("Log an error")
4383            sys.stdout.seek(0)
4384            self.assertEqual(output.getvalue().strip(),
4385                "ERROR:root:Log an error")
4386
4387    def test_filename(self):
4388
4389        def cleanup(h1, h2, fn):
4390            h1.close()
4391            h2.close()
4392            os.remove(fn)
4393
4394        logging.basicConfig(filename='test.log')
4395
4396        self.assertEqual(len(logging.root.handlers), 1)
4397        handler = logging.root.handlers[0]
4398        self.assertIsInstance(handler, logging.FileHandler)
4399
4400        expected = logging.FileHandler('test.log', 'a')
4401        self.assertEqual(handler.stream.mode, expected.stream.mode)
4402        self.assertEqual(handler.stream.name, expected.stream.name)
4403        self.addCleanup(cleanup, handler, expected, 'test.log')
4404
4405    def test_filemode(self):
4406
4407        def cleanup(h1, h2, fn):
4408            h1.close()
4409            h2.close()
4410            os.remove(fn)
4411
4412        logging.basicConfig(filename='test.log', filemode='wb')
4413
4414        handler = logging.root.handlers[0]
4415        expected = logging.FileHandler('test.log', 'wb')
4416        self.assertEqual(handler.stream.mode, expected.stream.mode)
4417        self.addCleanup(cleanup, handler, expected, 'test.log')
4418
4419    def test_stream(self):
4420        stream = io.StringIO()
4421        self.addCleanup(stream.close)
4422        logging.basicConfig(stream=stream)
4423
4424        self.assertEqual(len(logging.root.handlers), 1)
4425        handler = logging.root.handlers[0]
4426        self.assertIsInstance(handler, logging.StreamHandler)
4427        self.assertEqual(handler.stream, stream)
4428
4429    def test_format(self):
4430        logging.basicConfig(format='%(asctime)s - %(message)s')
4431
4432        formatter = logging.root.handlers[0].formatter
4433        self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s')
4434
4435    def test_datefmt(self):
4436        logging.basicConfig(datefmt='bar')
4437
4438        formatter = logging.root.handlers[0].formatter
4439        self.assertEqual(formatter.datefmt, 'bar')
4440
4441    def test_style(self):
4442        logging.basicConfig(style='$')
4443
4444        formatter = logging.root.handlers[0].formatter
4445        self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
4446
4447    def test_level(self):
4448        old_level = logging.root.level
4449        self.addCleanup(logging.root.setLevel, old_level)
4450
4451        logging.basicConfig(level=57)
4452        self.assertEqual(logging.root.level, 57)
4453        # Test that second call has no effect
4454        logging.basicConfig(level=58)
4455        self.assertEqual(logging.root.level, 57)
4456
4457    def test_incompatible(self):
4458        assertRaises = self.assertRaises
4459        handlers = [logging.StreamHandler()]
4460        stream = sys.stderr
4461        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4462                                                      stream=stream)
4463        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4464                                                      handlers=handlers)
4465        assertRaises(ValueError, logging.basicConfig, stream=stream,
4466                                                      handlers=handlers)
4467        # Issue 23207: test for invalid kwargs
4468        assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
4469        # Should pop both filename and filemode even if filename is None
4470        logging.basicConfig(filename=None, filemode='a')
4471
4472    def test_handlers(self):
4473        handlers = [
4474            logging.StreamHandler(),
4475            logging.StreamHandler(sys.stdout),
4476            logging.StreamHandler(),
4477        ]
4478        f = logging.Formatter()
4479        handlers[2].setFormatter(f)
4480        logging.basicConfig(handlers=handlers)
4481        self.assertIs(handlers[0], logging.root.handlers[0])
4482        self.assertIs(handlers[1], logging.root.handlers[1])
4483        self.assertIs(handlers[2], logging.root.handlers[2])
4484        self.assertIsNotNone(handlers[0].formatter)
4485        self.assertIsNotNone(handlers[1].formatter)
4486        self.assertIs(handlers[2].formatter, f)
4487        self.assertIs(handlers[0].formatter, handlers[1].formatter)
4488
4489    def test_force(self):
4490        old_string_io = io.StringIO()
4491        new_string_io = io.StringIO()
4492        old_handlers = [logging.StreamHandler(old_string_io)]
4493        new_handlers = [logging.StreamHandler(new_string_io)]
4494        logging.basicConfig(level=logging.WARNING, handlers=old_handlers)
4495        logging.warning('warn')
4496        logging.info('info')
4497        logging.debug('debug')
4498        self.assertEqual(len(logging.root.handlers), 1)
4499        logging.basicConfig(level=logging.INFO, handlers=new_handlers,
4500                            force=True)
4501        logging.warning('warn')
4502        logging.info('info')
4503        logging.debug('debug')
4504        self.assertEqual(len(logging.root.handlers), 1)
4505        self.assertEqual(old_string_io.getvalue().strip(),
4506                         'WARNING:root:warn')
4507        self.assertEqual(new_string_io.getvalue().strip(),
4508                         'WARNING:root:warn\nINFO:root:info')
4509
4510    def _test_log(self, method, level=None):
4511        # logging.root has no handlers so basicConfig should be called
4512        called = []
4513
4514        old_basic_config = logging.basicConfig
4515        def my_basic_config(*a, **kw):
4516            old_basic_config()
4517            old_level = logging.root.level
4518            logging.root.setLevel(100)  # avoid having messages in stderr
4519            self.addCleanup(logging.root.setLevel, old_level)
4520            called.append((a, kw))
4521
4522        support.patch(self, logging, 'basicConfig', my_basic_config)
4523
4524        log_method = getattr(logging, method)
4525        if level is not None:
4526            log_method(level, "test me")
4527        else:
4528            log_method("test me")
4529
4530        # basicConfig was called with no arguments
4531        self.assertEqual(called, [((), {})])
4532
4533    def test_log(self):
4534        self._test_log('log', logging.WARNING)
4535
4536    def test_debug(self):
4537        self._test_log('debug')
4538
4539    def test_info(self):
4540        self._test_log('info')
4541
4542    def test_warning(self):
4543        self._test_log('warning')
4544
4545    def test_error(self):
4546        self._test_log('error')
4547
4548    def test_critical(self):
4549        self._test_log('critical')
4550
4551
4552class LoggerAdapterTest(unittest.TestCase):
4553    def setUp(self):
4554        super(LoggerAdapterTest, self).setUp()
4555        old_handler_list = logging._handlerList[:]
4556
4557        self.recording = RecordingHandler()
4558        self.logger = logging.root
4559        self.logger.addHandler(self.recording)
4560        self.addCleanup(self.logger.removeHandler, self.recording)
4561        self.addCleanup(self.recording.close)
4562
4563        def cleanup():
4564            logging._handlerList[:] = old_handler_list
4565
4566        self.addCleanup(cleanup)
4567        self.addCleanup(logging.shutdown)
4568        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
4569
4570    def test_exception(self):
4571        msg = 'testing exception: %r'
4572        exc = None
4573        try:
4574            1 / 0
4575        except ZeroDivisionError as e:
4576            exc = e
4577            self.adapter.exception(msg, self.recording)
4578
4579        self.assertEqual(len(self.recording.records), 1)
4580        record = self.recording.records[0]
4581        self.assertEqual(record.levelno, logging.ERROR)
4582        self.assertEqual(record.msg, msg)
4583        self.assertEqual(record.args, (self.recording,))
4584        self.assertEqual(record.exc_info,
4585                         (exc.__class__, exc, exc.__traceback__))
4586
4587    def test_exception_excinfo(self):
4588        try:
4589            1 / 0
4590        except ZeroDivisionError as e:
4591            exc = e
4592
4593        self.adapter.exception('exc_info test', exc_info=exc)
4594
4595        self.assertEqual(len(self.recording.records), 1)
4596        record = self.recording.records[0]
4597        self.assertEqual(record.exc_info,
4598                         (exc.__class__, exc, exc.__traceback__))
4599
4600    def test_critical(self):
4601        msg = 'critical test! %r'
4602        self.adapter.critical(msg, self.recording)
4603
4604        self.assertEqual(len(self.recording.records), 1)
4605        record = self.recording.records[0]
4606        self.assertEqual(record.levelno, logging.CRITICAL)
4607        self.assertEqual(record.msg, msg)
4608        self.assertEqual(record.args, (self.recording,))
4609
4610    def test_is_enabled_for(self):
4611        old_disable = self.adapter.logger.manager.disable
4612        self.adapter.logger.manager.disable = 33
4613        self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
4614                        old_disable)
4615        self.assertFalse(self.adapter.isEnabledFor(32))
4616
4617    def test_has_handlers(self):
4618        self.assertTrue(self.adapter.hasHandlers())
4619
4620        for handler in self.logger.handlers:
4621            self.logger.removeHandler(handler)
4622
4623        self.assertFalse(self.logger.hasHandlers())
4624        self.assertFalse(self.adapter.hasHandlers())
4625
4626    def test_nested(self):
4627        class Adapter(logging.LoggerAdapter):
4628            prefix = 'Adapter'
4629
4630            def process(self, msg, kwargs):
4631                return f"{self.prefix} {msg}", kwargs
4632
4633        msg = 'Adapters can be nested, yo.'
4634        adapter = Adapter(logger=self.logger, extra=None)
4635        adapter_adapter = Adapter(logger=adapter, extra=None)
4636        adapter_adapter.prefix = 'AdapterAdapter'
4637        self.assertEqual(repr(adapter), repr(adapter_adapter))
4638        adapter_adapter.log(logging.CRITICAL, msg, self.recording)
4639        self.assertEqual(len(self.recording.records), 1)
4640        record = self.recording.records[0]
4641        self.assertEqual(record.levelno, logging.CRITICAL)
4642        self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}")
4643        self.assertEqual(record.args, (self.recording,))
4644        orig_manager = adapter_adapter.manager
4645        self.assertIs(adapter.manager, orig_manager)
4646        self.assertIs(self.logger.manager, orig_manager)
4647        temp_manager = object()
4648        try:
4649            adapter_adapter.manager = temp_manager
4650            self.assertIs(adapter_adapter.manager, temp_manager)
4651            self.assertIs(adapter.manager, temp_manager)
4652            self.assertIs(self.logger.manager, temp_manager)
4653        finally:
4654            adapter_adapter.manager = orig_manager
4655        self.assertIs(adapter_adapter.manager, orig_manager)
4656        self.assertIs(adapter.manager, orig_manager)
4657        self.assertIs(self.logger.manager, orig_manager)
4658
4659
4660class LoggerTest(BaseTest):
4661
4662    def setUp(self):
4663        super(LoggerTest, self).setUp()
4664        self.recording = RecordingHandler()
4665        self.logger = logging.Logger(name='blah')
4666        self.logger.addHandler(self.recording)
4667        self.addCleanup(self.logger.removeHandler, self.recording)
4668        self.addCleanup(self.recording.close)
4669        self.addCleanup(logging.shutdown)
4670
4671    def test_set_invalid_level(self):
4672        self.assertRaises(TypeError, self.logger.setLevel, object())
4673
4674    def test_exception(self):
4675        msg = 'testing exception: %r'
4676        exc = None
4677        try:
4678            1 / 0
4679        except ZeroDivisionError as e:
4680            exc = e
4681            self.logger.exception(msg, self.recording)
4682
4683        self.assertEqual(len(self.recording.records), 1)
4684        record = self.recording.records[0]
4685        self.assertEqual(record.levelno, logging.ERROR)
4686        self.assertEqual(record.msg, msg)
4687        self.assertEqual(record.args, (self.recording,))
4688        self.assertEqual(record.exc_info,
4689                         (exc.__class__, exc, exc.__traceback__))
4690
4691    def test_log_invalid_level_with_raise(self):
4692        with support.swap_attr(logging, 'raiseExceptions', True):
4693            self.assertRaises(TypeError, self.logger.log, '10', 'test message')
4694
4695    def test_log_invalid_level_no_raise(self):
4696        with support.swap_attr(logging, 'raiseExceptions', False):
4697            self.logger.log('10', 'test message')  # no exception happens
4698
4699    def test_find_caller_with_stack_info(self):
4700        called = []
4701        support.patch(self, logging.traceback, 'print_stack',
4702                      lambda f, file: called.append(file.getvalue()))
4703
4704        self.logger.findCaller(stack_info=True)
4705
4706        self.assertEqual(len(called), 1)
4707        self.assertEqual('Stack (most recent call last):\n', called[0])
4708
4709    def test_find_caller_with_stacklevel(self):
4710        the_level = 1
4711
4712        def innermost():
4713            self.logger.warning('test', stacklevel=the_level)
4714
4715        def inner():
4716            innermost()
4717
4718        def outer():
4719            inner()
4720
4721        records = self.recording.records
4722        outer()
4723        self.assertEqual(records[-1].funcName, 'innermost')
4724        lineno = records[-1].lineno
4725        the_level += 1
4726        outer()
4727        self.assertEqual(records[-1].funcName, 'inner')
4728        self.assertGreater(records[-1].lineno, lineno)
4729        lineno = records[-1].lineno
4730        the_level += 1
4731        outer()
4732        self.assertEqual(records[-1].funcName, 'outer')
4733        self.assertGreater(records[-1].lineno, lineno)
4734        lineno = records[-1].lineno
4735        the_level += 1
4736        outer()
4737        self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel')
4738        self.assertGreater(records[-1].lineno, lineno)
4739
4740    def test_make_record_with_extra_overwrite(self):
4741        name = 'my record'
4742        level = 13
4743        fn = lno = msg = args = exc_info = func = sinfo = None
4744        rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
4745                                       exc_info, func, sinfo)
4746
4747        for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
4748            extra = {key: 'some value'}
4749            self.assertRaises(KeyError, self.logger.makeRecord, name, level,
4750                              fn, lno, msg, args, exc_info,
4751                              extra=extra, sinfo=sinfo)
4752
4753    def test_make_record_with_extra_no_overwrite(self):
4754        name = 'my record'
4755        level = 13
4756        fn = lno = msg = args = exc_info = func = sinfo = None
4757        extra = {'valid_key': 'some value'}
4758        result = self.logger.makeRecord(name, level, fn, lno, msg, args,
4759                                        exc_info, extra=extra, sinfo=sinfo)
4760        self.assertIn('valid_key', result.__dict__)
4761
4762    def test_has_handlers(self):
4763        self.assertTrue(self.logger.hasHandlers())
4764
4765        for handler in self.logger.handlers:
4766            self.logger.removeHandler(handler)
4767        self.assertFalse(self.logger.hasHandlers())
4768
4769    def test_has_handlers_no_propagate(self):
4770        child_logger = logging.getLogger('blah.child')
4771        child_logger.propagate = False
4772        self.assertFalse(child_logger.hasHandlers())
4773
4774    def test_is_enabled_for(self):
4775        old_disable = self.logger.manager.disable
4776        self.logger.manager.disable = 23
4777        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
4778        self.assertFalse(self.logger.isEnabledFor(22))
4779
4780    def test_is_enabled_for_disabled_logger(self):
4781        old_disabled = self.logger.disabled
4782        old_disable = self.logger.manager.disable
4783
4784        self.logger.disabled = True
4785        self.logger.manager.disable = 21
4786
4787        self.addCleanup(setattr, self.logger, 'disabled', old_disabled)
4788        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
4789
4790        self.assertFalse(self.logger.isEnabledFor(22))
4791
4792    def test_root_logger_aliases(self):
4793        root = logging.getLogger()
4794        self.assertIs(root, logging.root)
4795        self.assertIs(root, logging.getLogger(None))
4796        self.assertIs(root, logging.getLogger(''))
4797        self.assertIs(root, logging.getLogger('foo').root)
4798        self.assertIs(root, logging.getLogger('foo.bar').root)
4799        self.assertIs(root, logging.getLogger('foo').parent)
4800
4801        self.assertIsNot(root, logging.getLogger('\0'))
4802        self.assertIsNot(root, logging.getLogger('foo.bar').parent)
4803
4804    def test_invalid_names(self):
4805        self.assertRaises(TypeError, logging.getLogger, any)
4806        self.assertRaises(TypeError, logging.getLogger, b'foo')
4807
4808    def test_pickling(self):
4809        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4810            for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'):
4811                logger = logging.getLogger(name)
4812                s = pickle.dumps(logger, proto)
4813                unpickled = pickle.loads(s)
4814                self.assertIs(unpickled, logger)
4815
4816    def test_caching(self):
4817        root = self.root_logger
4818        logger1 = logging.getLogger("abc")
4819        logger2 = logging.getLogger("abc.def")
4820
4821        # Set root logger level and ensure cache is empty
4822        root.setLevel(logging.ERROR)
4823        self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR)
4824        self.assertEqual(logger2._cache, {})
4825
4826        # Ensure cache is populated and calls are consistent
4827        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
4828        self.assertFalse(logger2.isEnabledFor(logging.DEBUG))
4829        self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False})
4830        self.assertEqual(root._cache, {})
4831        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
4832
4833        # Ensure root cache gets populated
4834        self.assertEqual(root._cache, {})
4835        self.assertTrue(root.isEnabledFor(logging.ERROR))
4836        self.assertEqual(root._cache, {logging.ERROR: True})
4837
4838        # Set parent logger level and ensure caches are emptied
4839        logger1.setLevel(logging.CRITICAL)
4840        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
4841        self.assertEqual(logger2._cache, {})
4842
4843        # Ensure logger2 uses parent logger's effective level
4844        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
4845
4846        # Set level to NOTSET and ensure caches are empty
4847        logger2.setLevel(logging.NOTSET)
4848        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
4849        self.assertEqual(logger2._cache, {})
4850        self.assertEqual(logger1._cache, {})
4851        self.assertEqual(root._cache, {})
4852
4853        # Verify logger2 follows parent and not root
4854        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
4855        self.assertTrue(logger2.isEnabledFor(logging.CRITICAL))
4856        self.assertFalse(logger1.isEnabledFor(logging.ERROR))
4857        self.assertTrue(logger1.isEnabledFor(logging.CRITICAL))
4858        self.assertTrue(root.isEnabledFor(logging.ERROR))
4859
4860        # Disable logging in manager and ensure caches are clear
4861        logging.disable()
4862        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
4863        self.assertEqual(logger2._cache, {})
4864        self.assertEqual(logger1._cache, {})
4865        self.assertEqual(root._cache, {})
4866
4867        # Ensure no loggers are enabled
4868        self.assertFalse(logger1.isEnabledFor(logging.CRITICAL))
4869        self.assertFalse(logger2.isEnabledFor(logging.CRITICAL))
4870        self.assertFalse(root.isEnabledFor(logging.CRITICAL))
4871
4872
4873class BaseFileTest(BaseTest):
4874    "Base class for handler tests that write log files"
4875
4876    def setUp(self):
4877        BaseTest.setUp(self)
4878        fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
4879        os.close(fd)
4880        self.rmfiles = []
4881
4882    def tearDown(self):
4883        for fn in self.rmfiles:
4884            os.unlink(fn)
4885        if os.path.exists(self.fn):
4886            os.unlink(self.fn)
4887        BaseTest.tearDown(self)
4888
4889    def assertLogFile(self, filename):
4890        "Assert a log file is there and register it for deletion"
4891        self.assertTrue(os.path.exists(filename),
4892                        msg="Log file %r does not exist" % filename)
4893        self.rmfiles.append(filename)
4894
4895
4896class FileHandlerTest(BaseFileTest):
4897    def test_delay(self):
4898        os.unlink(self.fn)
4899        fh = logging.FileHandler(self.fn, delay=True)
4900        self.assertIsNone(fh.stream)
4901        self.assertFalse(os.path.exists(self.fn))
4902        fh.handle(logging.makeLogRecord({}))
4903        self.assertIsNotNone(fh.stream)
4904        self.assertTrue(os.path.exists(self.fn))
4905        fh.close()
4906
4907class RotatingFileHandlerTest(BaseFileTest):
4908    def next_rec(self):
4909        return logging.LogRecord('n', logging.DEBUG, 'p', 1,
4910                                 self.next_message(), None, None, None)
4911
4912    def test_should_not_rollover(self):
4913        # If maxbytes is zero rollover never occurs
4914        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0)
4915        self.assertFalse(rh.shouldRollover(None))
4916        rh.close()
4917
4918    def test_should_rollover(self):
4919        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1)
4920        self.assertTrue(rh.shouldRollover(self.next_rec()))
4921        rh.close()
4922
4923    def test_file_created(self):
4924        # checks that the file is created and assumes it was created
4925        # by us
4926        rh = logging.handlers.RotatingFileHandler(self.fn)
4927        rh.emit(self.next_rec())
4928        self.assertLogFile(self.fn)
4929        rh.close()
4930
4931    def test_rollover_filenames(self):
4932        def namer(name):
4933            return name + ".test"
4934        rh = logging.handlers.RotatingFileHandler(
4935            self.fn, backupCount=2, maxBytes=1)
4936        rh.namer = namer
4937        rh.emit(self.next_rec())
4938        self.assertLogFile(self.fn)
4939        rh.emit(self.next_rec())
4940        self.assertLogFile(namer(self.fn + ".1"))
4941        rh.emit(self.next_rec())
4942        self.assertLogFile(namer(self.fn + ".2"))
4943        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4944        rh.close()
4945
4946    @support.requires_zlib
4947    def test_rotator(self):
4948        def namer(name):
4949            return name + ".gz"
4950
4951        def rotator(source, dest):
4952            with open(source, "rb") as sf:
4953                data = sf.read()
4954                compressed = zlib.compress(data, 9)
4955                with open(dest, "wb") as df:
4956                    df.write(compressed)
4957            os.remove(source)
4958
4959        rh = logging.handlers.RotatingFileHandler(
4960            self.fn, backupCount=2, maxBytes=1)
4961        rh.rotator = rotator
4962        rh.namer = namer
4963        m1 = self.next_rec()
4964        rh.emit(m1)
4965        self.assertLogFile(self.fn)
4966        m2 = self.next_rec()
4967        rh.emit(m2)
4968        fn = namer(self.fn + ".1")
4969        self.assertLogFile(fn)
4970        newline = os.linesep
4971        with open(fn, "rb") as f:
4972            compressed = f.read()
4973            data = zlib.decompress(compressed)
4974            self.assertEqual(data.decode("ascii"), m1.msg + newline)
4975        rh.emit(self.next_rec())
4976        fn = namer(self.fn + ".2")
4977        self.assertLogFile(fn)
4978        with open(fn, "rb") as f:
4979            compressed = f.read()
4980            data = zlib.decompress(compressed)
4981            self.assertEqual(data.decode("ascii"), m1.msg + newline)
4982        rh.emit(self.next_rec())
4983        fn = namer(self.fn + ".2")
4984        with open(fn, "rb") as f:
4985            compressed = f.read()
4986            data = zlib.decompress(compressed)
4987            self.assertEqual(data.decode("ascii"), m2.msg + newline)
4988        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4989        rh.close()
4990
4991class TimedRotatingFileHandlerTest(BaseFileTest):
4992    # other test methods added below
4993    def test_rollover(self):
4994        fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S',
4995                                                       backupCount=1)
4996        fmt = logging.Formatter('%(asctime)s %(message)s')
4997        fh.setFormatter(fmt)
4998        r1 = logging.makeLogRecord({'msg': 'testing - initial'})
4999        fh.emit(r1)
5000        self.assertLogFile(self.fn)
5001        time.sleep(1.1)    # a little over a second ...
5002        r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
5003        fh.emit(r2)
5004        fh.close()
5005        # At this point, we should have a recent rotated file which we
5006        # can test for the existence of. However, in practice, on some
5007        # machines which run really slowly, we don't know how far back
5008        # in time to go to look for the log file. So, we go back a fair
5009        # bit, and stop as soon as we see a rotated file. In theory this
5010        # could of course still fail, but the chances are lower.
5011        found = False
5012        now = datetime.datetime.now()
5013        GO_BACK = 5 * 60 # seconds
5014        for secs in range(GO_BACK):
5015            prev = now - datetime.timedelta(seconds=secs)
5016            fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
5017            found = os.path.exists(fn)
5018            if found:
5019                self.rmfiles.append(fn)
5020                break
5021        msg = 'No rotated files found, went back %d seconds' % GO_BACK
5022        if not found:
5023            # print additional diagnostics
5024            dn, fn = os.path.split(self.fn)
5025            files = [f for f in os.listdir(dn) if f.startswith(fn)]
5026            print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
5027            print('The only matching files are: %s' % files, file=sys.stderr)
5028            for f in files:
5029                print('Contents of %s:' % f)
5030                path = os.path.join(dn, f)
5031                with open(path, 'r') as tf:
5032                    print(tf.read())
5033        self.assertTrue(found, msg=msg)
5034
5035    def test_invalid(self):
5036        assertRaises = self.assertRaises
5037        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5038                     self.fn, 'X', delay=True)
5039        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5040                     self.fn, 'W', delay=True)
5041        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5042                     self.fn, 'W7', delay=True)
5043
5044    def test_compute_rollover_daily_attime(self):
5045        currentTime = 0
5046        atTime = datetime.time(12, 0, 0)
5047        rh = logging.handlers.TimedRotatingFileHandler(
5048            self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True,
5049            atTime=atTime)
5050        try:
5051            actual = rh.computeRollover(currentTime)
5052            self.assertEqual(actual, currentTime + 12 * 60 * 60)
5053
5054            actual = rh.computeRollover(currentTime + 13 * 60 * 60)
5055            self.assertEqual(actual, currentTime + 36 * 60 * 60)
5056        finally:
5057            rh.close()
5058
5059    #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.')
5060    def test_compute_rollover_weekly_attime(self):
5061        currentTime = int(time.time())
5062        today = currentTime - currentTime % 86400
5063
5064        atTime = datetime.time(12, 0, 0)
5065
5066        wday = time.gmtime(today).tm_wday
5067        for day in range(7):
5068            rh = logging.handlers.TimedRotatingFileHandler(
5069                self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True,
5070                atTime=atTime)
5071            try:
5072                if wday > day:
5073                    # The rollover day has already passed this week, so we
5074                    # go over into next week
5075                    expected = (7 - wday + day)
5076                else:
5077                    expected = (day - wday)
5078                # At this point expected is in days from now, convert to seconds
5079                expected *= 24 * 60 * 60
5080                # Add in the rollover time
5081                expected += 12 * 60 * 60
5082                # Add in adjustment for today
5083                expected += today
5084                actual = rh.computeRollover(today)
5085                if actual != expected:
5086                    print('failed in timezone: %d' % time.timezone)
5087                    print('local vars: %s' % locals())
5088                self.assertEqual(actual, expected)
5089                if day == wday:
5090                    # goes into following week
5091                    expected += 7 * 24 * 60 * 60
5092                actual = rh.computeRollover(today + 13 * 60 * 60)
5093                if actual != expected:
5094                    print('failed in timezone: %d' % time.timezone)
5095                    print('local vars: %s' % locals())
5096                self.assertEqual(actual, expected)
5097            finally:
5098                rh.close()
5099
5100
5101def secs(**kw):
5102    return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
5103
5104for when, exp in (('S', 1),
5105                  ('M', 60),
5106                  ('H', 60 * 60),
5107                  ('D', 60 * 60 * 24),
5108                  ('MIDNIGHT', 60 * 60 * 24),
5109                  # current time (epoch start) is a Thursday, W0 means Monday
5110                  ('W0', secs(days=4, hours=24)),
5111                 ):
5112    def test_compute_rollover(self, when=when, exp=exp):
5113        rh = logging.handlers.TimedRotatingFileHandler(
5114            self.fn, when=when, interval=1, backupCount=0, utc=True)
5115        currentTime = 0.0
5116        actual = rh.computeRollover(currentTime)
5117        if exp != actual:
5118            # Failures occur on some systems for MIDNIGHT and W0.
5119            # Print detailed calculation for MIDNIGHT so we can try to see
5120            # what's going on
5121            if when == 'MIDNIGHT':
5122                try:
5123                    if rh.utc:
5124                        t = time.gmtime(currentTime)
5125                    else:
5126                        t = time.localtime(currentTime)
5127                    currentHour = t[3]
5128                    currentMinute = t[4]
5129                    currentSecond = t[5]
5130                    # r is the number of seconds left between now and midnight
5131                    r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
5132                                                       currentMinute) * 60 +
5133                            currentSecond)
5134                    result = currentTime + r
5135                    print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
5136                    print('currentHour: %s' % currentHour, file=sys.stderr)
5137                    print('currentMinute: %s' % currentMinute, file=sys.stderr)
5138                    print('currentSecond: %s' % currentSecond, file=sys.stderr)
5139                    print('r: %s' % r, file=sys.stderr)
5140                    print('result: %s' % result, file=sys.stderr)
5141                except Exception:
5142                    print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr)
5143        self.assertEqual(exp, actual)
5144        rh.close()
5145    setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
5146
5147
5148@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
5149class NTEventLogHandlerTest(BaseTest):
5150    def test_basic(self):
5151        logtype = 'Application'
5152        elh = win32evtlog.OpenEventLog(None, logtype)
5153        num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
5154
5155        try:
5156            h = logging.handlers.NTEventLogHandler('test_logging')
5157        except pywintypes.error as e:
5158            if e.winerror == 5:  # access denied
5159                raise unittest.SkipTest('Insufficient privileges to run test')
5160            raise
5161
5162        r = logging.makeLogRecord({'msg': 'Test Log Message'})
5163        h.handle(r)
5164        h.close()
5165        # Now see if the event is recorded
5166        self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
5167        flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
5168                win32evtlog.EVENTLOG_SEQUENTIAL_READ
5169        found = False
5170        GO_BACK = 100
5171        events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
5172        for e in events:
5173            if e.SourceName != 'test_logging':
5174                continue
5175            msg = win32evtlogutil.SafeFormatMessage(e, logtype)
5176            if msg != 'Test Log Message\r\n':
5177                continue
5178            found = True
5179            break
5180        msg = 'Record not found in event log, went back %d records' % GO_BACK
5181        self.assertTrue(found, msg=msg)
5182
5183
5184class MiscTestCase(unittest.TestCase):
5185    def test__all__(self):
5186        blacklist = {'logThreads', 'logMultiprocessing',
5187                     'logProcesses', 'currentframe',
5188                     'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle',
5189                     'Filterer', 'PlaceHolder', 'Manager', 'RootLogger',
5190                     'root', 'threading'}
5191        support.check__all__(self, logging, blacklist=blacklist)
5192
5193
5194# Set the locale to the platform-dependent default.  I have no idea
5195# why the test does this, but in any case we save the current locale
5196# first and restore it at the end.
5197@support.run_with_locale('LC_ALL', '')
5198def test_main():
5199    tests = [
5200        BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest,
5201        HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest,
5202        DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest,
5203        ConfigDictTest, ManagerTest, FormatterTest, BufferingFormatterTest,
5204        StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest,
5205        QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
5206        LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest,
5207        RotatingFileHandlerTest,  LastResortTest, LogRecordTest,
5208        ExceptionTest, SysLogHandlerTest, IPv6SysLogHandlerTest, HTTPHandlerTest,
5209        NTEventLogHandlerTest, TimedRotatingFileHandlerTest,
5210        UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest,
5211        MiscTestCase
5212    ]
5213    if hasattr(logging.handlers, 'QueueListener'):
5214        tests.append(QueueListenerTest)
5215    support.run_unittest(*tests)
5216
5217if __name__ == "__main__":
5218    test_main()
5219