• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2001-2017 by Vinay Sajip. All Rights Reserved.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose and without fee is hereby granted,
5# provided that the above copyright notice appear in all copies and that
6# both that copyright notice and this permission notice appear in
7# supporting documentation, and that the name of Vinay Sajip
8# not be used in advertising or publicity pertaining to distribution
9# of the software without specific, written prior permission.
10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
17"""Test harness for the logging module. Run all tests.
18
19Copyright (C) 2001-2017 Vinay Sajip. All Rights Reserved.
20"""
21
22import logging
23import logging.handlers
24import logging.config
25
26import codecs
27import configparser
28import copy
29import datetime
30import pathlib
31import pickle
32import io
33import gc
34import json
35import os
36import queue
37import random
38import re
39import signal
40import socket
41import struct
42import sys
43import tempfile
44from test.support.script_helper import assert_python_ok, assert_python_failure
45from test import support
46import textwrap
47import threading
48import time
49import unittest
50import warnings
51import weakref
52
53import asyncore
54from http.server import HTTPServer, BaseHTTPRequestHandler
55import smtpd
56from urllib.parse import urlparse, parse_qs
57from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
58                          ThreadingTCPServer, StreamRequestHandler)
59
60try:
61    import win32evtlog, win32evtlogutil, pywintypes
62except ImportError:
63    win32evtlog = win32evtlogutil = pywintypes = None
64
65try:
66    import zlib
67except ImportError:
68    pass
69
70class BaseTest(unittest.TestCase):
71
72    """Base class for logging tests."""
73
74    log_format = "%(name)s -> %(levelname)s: %(message)s"
75    expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
76    message_num = 0
77
78    def setUp(self):
79        """Setup the default logging stream to an internal StringIO instance,
80        so that we can examine log output as we want."""
81        self._threading_key = support.threading_setup()
82
83        logger_dict = logging.getLogger().manager.loggerDict
84        logging._acquireLock()
85        try:
86            self.saved_handlers = logging._handlers.copy()
87            self.saved_handler_list = logging._handlerList[:]
88            self.saved_loggers = saved_loggers = logger_dict.copy()
89            self.saved_name_to_level = logging._nameToLevel.copy()
90            self.saved_level_to_name = logging._levelToName.copy()
91            self.logger_states = logger_states = {}
92            for name in saved_loggers:
93                logger_states[name] = getattr(saved_loggers[name],
94                                              'disabled', None)
95        finally:
96            logging._releaseLock()
97
98        # Set two unused loggers
99        self.logger1 = logging.getLogger("\xab\xd7\xbb")
100        self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
101
102        self.root_logger = logging.getLogger("")
103        self.original_logging_level = self.root_logger.getEffectiveLevel()
104
105        self.stream = io.StringIO()
106        self.root_logger.setLevel(logging.DEBUG)
107        self.root_hdlr = logging.StreamHandler(self.stream)
108        self.root_formatter = logging.Formatter(self.log_format)
109        self.root_hdlr.setFormatter(self.root_formatter)
110        if self.logger1.hasHandlers():
111            hlist = self.logger1.handlers + self.root_logger.handlers
112            raise AssertionError('Unexpected handlers: %s' % hlist)
113        if self.logger2.hasHandlers():
114            hlist = self.logger2.handlers + self.root_logger.handlers
115            raise AssertionError('Unexpected handlers: %s' % hlist)
116        self.root_logger.addHandler(self.root_hdlr)
117        self.assertTrue(self.logger1.hasHandlers())
118        self.assertTrue(self.logger2.hasHandlers())
119
120    def tearDown(self):
121        """Remove our logging stream, and restore the original logging
122        level."""
123        self.stream.close()
124        self.root_logger.removeHandler(self.root_hdlr)
125        while self.root_logger.handlers:
126            h = self.root_logger.handlers[0]
127            self.root_logger.removeHandler(h)
128            h.close()
129        self.root_logger.setLevel(self.original_logging_level)
130        logging._acquireLock()
131        try:
132            logging._levelToName.clear()
133            logging._levelToName.update(self.saved_level_to_name)
134            logging._nameToLevel.clear()
135            logging._nameToLevel.update(self.saved_name_to_level)
136            logging._handlers.clear()
137            logging._handlers.update(self.saved_handlers)
138            logging._handlerList[:] = self.saved_handler_list
139            manager = logging.getLogger().manager
140            manager.disable = 0
141            loggerDict = manager.loggerDict
142            loggerDict.clear()
143            loggerDict.update(self.saved_loggers)
144            logger_states = self.logger_states
145            for name in self.logger_states:
146                if logger_states[name] is not None:
147                    self.saved_loggers[name].disabled = logger_states[name]
148        finally:
149            logging._releaseLock()
150
151        self.doCleanups()
152        support.threading_cleanup(*self._threading_key)
153
154    def assert_log_lines(self, expected_values, stream=None, pat=None):
155        """Match the collected log lines against the regular expression
156        self.expected_log_pat, and compare the extracted group values to
157        the expected_values list of tuples."""
158        stream = stream or self.stream
159        pat = re.compile(pat or self.expected_log_pat)
160        actual_lines = stream.getvalue().splitlines()
161        self.assertEqual(len(actual_lines), len(expected_values))
162        for actual, expected in zip(actual_lines, expected_values):
163            match = pat.search(actual)
164            if not match:
165                self.fail("Log line does not match expected pattern:\n" +
166                            actual)
167            self.assertEqual(tuple(match.groups()), expected)
168        s = stream.read()
169        if s:
170            self.fail("Remaining output at end of log stream:\n" + s)
171
172    def next_message(self):
173        """Generate a message consisting solely of an auto-incrementing
174        integer."""
175        self.message_num += 1
176        return "%d" % self.message_num
177
178
179class BuiltinLevelsTest(BaseTest):
180    """Test builtin levels and their inheritance."""
181
182    def test_flat(self):
183        # Logging levels in a flat logger namespace.
184        m = self.next_message
185
186        ERR = logging.getLogger("ERR")
187        ERR.setLevel(logging.ERROR)
188        INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
189        INF.setLevel(logging.INFO)
190        DEB = logging.getLogger("DEB")
191        DEB.setLevel(logging.DEBUG)
192
193        # These should log.
194        ERR.log(logging.CRITICAL, m())
195        ERR.error(m())
196
197        INF.log(logging.CRITICAL, m())
198        INF.error(m())
199        INF.warning(m())
200        INF.info(m())
201
202        DEB.log(logging.CRITICAL, m())
203        DEB.error(m())
204        DEB.warning(m())
205        DEB.info(m())
206        DEB.debug(m())
207
208        # These should not log.
209        ERR.warning(m())
210        ERR.info(m())
211        ERR.debug(m())
212
213        INF.debug(m())
214
215        self.assert_log_lines([
216            ('ERR', 'CRITICAL', '1'),
217            ('ERR', 'ERROR', '2'),
218            ('INF', 'CRITICAL', '3'),
219            ('INF', 'ERROR', '4'),
220            ('INF', 'WARNING', '5'),
221            ('INF', 'INFO', '6'),
222            ('DEB', 'CRITICAL', '7'),
223            ('DEB', 'ERROR', '8'),
224            ('DEB', 'WARNING', '9'),
225            ('DEB', 'INFO', '10'),
226            ('DEB', 'DEBUG', '11'),
227        ])
228
229    def test_nested_explicit(self):
230        # Logging levels in a nested namespace, all explicitly set.
231        m = self.next_message
232
233        INF = logging.getLogger("INF")
234        INF.setLevel(logging.INFO)
235        INF_ERR  = logging.getLogger("INF.ERR")
236        INF_ERR.setLevel(logging.ERROR)
237
238        # These should log.
239        INF_ERR.log(logging.CRITICAL, m())
240        INF_ERR.error(m())
241
242        # These should not log.
243        INF_ERR.warning(m())
244        INF_ERR.info(m())
245        INF_ERR.debug(m())
246
247        self.assert_log_lines([
248            ('INF.ERR', 'CRITICAL', '1'),
249            ('INF.ERR', 'ERROR', '2'),
250        ])
251
252    def test_nested_inherited(self):
253        # Logging levels in a nested namespace, inherited from parent loggers.
254        m = self.next_message
255
256        INF = logging.getLogger("INF")
257        INF.setLevel(logging.INFO)
258        INF_ERR  = logging.getLogger("INF.ERR")
259        INF_ERR.setLevel(logging.ERROR)
260        INF_UNDEF = logging.getLogger("INF.UNDEF")
261        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
262        UNDEF = logging.getLogger("UNDEF")
263
264        # These should log.
265        INF_UNDEF.log(logging.CRITICAL, m())
266        INF_UNDEF.error(m())
267        INF_UNDEF.warning(m())
268        INF_UNDEF.info(m())
269        INF_ERR_UNDEF.log(logging.CRITICAL, m())
270        INF_ERR_UNDEF.error(m())
271
272        # These should not log.
273        INF_UNDEF.debug(m())
274        INF_ERR_UNDEF.warning(m())
275        INF_ERR_UNDEF.info(m())
276        INF_ERR_UNDEF.debug(m())
277
278        self.assert_log_lines([
279            ('INF.UNDEF', 'CRITICAL', '1'),
280            ('INF.UNDEF', 'ERROR', '2'),
281            ('INF.UNDEF', 'WARNING', '3'),
282            ('INF.UNDEF', 'INFO', '4'),
283            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
284            ('INF.ERR.UNDEF', 'ERROR', '6'),
285        ])
286
287    def test_nested_with_virtual_parent(self):
288        # Logging levels when some parent does not exist yet.
289        m = self.next_message
290
291        INF = logging.getLogger("INF")
292        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
293        CHILD = logging.getLogger("INF.BADPARENT")
294        INF.setLevel(logging.INFO)
295
296        # These should log.
297        GRANDCHILD.log(logging.FATAL, m())
298        GRANDCHILD.info(m())
299        CHILD.log(logging.FATAL, m())
300        CHILD.info(m())
301
302        # These should not log.
303        GRANDCHILD.debug(m())
304        CHILD.debug(m())
305
306        self.assert_log_lines([
307            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
308            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
309            ('INF.BADPARENT', 'CRITICAL', '3'),
310            ('INF.BADPARENT', 'INFO', '4'),
311        ])
312
313    def test_regression_22386(self):
314        """See issue #22386 for more information."""
315        self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
316        self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
317
318    def test_issue27935(self):
319        fatal = logging.getLevelName('FATAL')
320        self.assertEqual(fatal, logging.FATAL)
321
322    def test_regression_29220(self):
323        """See issue #29220 for more information."""
324        logging.addLevelName(logging.INFO, '')
325        self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
326        self.assertEqual(logging.getLevelName(logging.INFO), '')
327        self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET')
328        self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET)
329
330class BasicFilterTest(BaseTest):
331
332    """Test the bundled Filter class."""
333
334    def test_filter(self):
335        # Only messages satisfying the specified criteria pass through the
336        #  filter.
337        filter_ = logging.Filter("spam.eggs")
338        handler = self.root_logger.handlers[0]
339        try:
340            handler.addFilter(filter_)
341            spam = logging.getLogger("spam")
342            spam_eggs = logging.getLogger("spam.eggs")
343            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
344            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
345
346            spam.info(self.next_message())
347            spam_eggs.info(self.next_message())  # Good.
348            spam_eggs_fish.info(self.next_message())  # Good.
349            spam_bakedbeans.info(self.next_message())
350
351            self.assert_log_lines([
352                ('spam.eggs', 'INFO', '2'),
353                ('spam.eggs.fish', 'INFO', '3'),
354            ])
355        finally:
356            handler.removeFilter(filter_)
357
358    def test_callable_filter(self):
359        # Only messages satisfying the specified criteria pass through the
360        #  filter.
361
362        def filterfunc(record):
363            parts = record.name.split('.')
364            prefix = '.'.join(parts[:2])
365            return prefix == 'spam.eggs'
366
367        handler = self.root_logger.handlers[0]
368        try:
369            handler.addFilter(filterfunc)
370            spam = logging.getLogger("spam")
371            spam_eggs = logging.getLogger("spam.eggs")
372            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
373            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
374
375            spam.info(self.next_message())
376            spam_eggs.info(self.next_message())  # Good.
377            spam_eggs_fish.info(self.next_message())  # Good.
378            spam_bakedbeans.info(self.next_message())
379
380            self.assert_log_lines([
381                ('spam.eggs', 'INFO', '2'),
382                ('spam.eggs.fish', 'INFO', '3'),
383            ])
384        finally:
385            handler.removeFilter(filterfunc)
386
387    def test_empty_filter(self):
388        f = logging.Filter()
389        r = logging.makeLogRecord({'name': 'spam.eggs'})
390        self.assertTrue(f.filter(r))
391
392#
393#   First, we define our levels. There can be as many as you want - the only
394#     limitations are that they should be integers, the lowest should be > 0 and
395#   larger values mean less information being logged. If you need specific
396#   level values which do not fit into these limitations, you can use a
397#   mapping dictionary to convert between your application levels and the
398#   logging system.
399#
400SILENT      = 120
401TACITURN    = 119
402TERSE       = 118
403EFFUSIVE    = 117
404SOCIABLE    = 116
405VERBOSE     = 115
406TALKATIVE   = 114
407GARRULOUS   = 113
408CHATTERBOX  = 112
409BORING      = 111
410
411LEVEL_RANGE = range(BORING, SILENT + 1)
412
413#
414#   Next, we define names for our levels. You don't need to do this - in which
415#   case the system will use "Level n" to denote the text for the level.
416#
417my_logging_levels = {
418    SILENT      : 'Silent',
419    TACITURN    : 'Taciturn',
420    TERSE       : 'Terse',
421    EFFUSIVE    : 'Effusive',
422    SOCIABLE    : 'Sociable',
423    VERBOSE     : 'Verbose',
424    TALKATIVE   : 'Talkative',
425    GARRULOUS   : 'Garrulous',
426    CHATTERBOX  : 'Chatterbox',
427    BORING      : 'Boring',
428}
429
430class GarrulousFilter(logging.Filter):
431
432    """A filter which blocks garrulous messages."""
433
434    def filter(self, record):
435        return record.levelno != GARRULOUS
436
437class VerySpecificFilter(logging.Filter):
438
439    """A filter which blocks sociable and taciturn messages."""
440
441    def filter(self, record):
442        return record.levelno not in [SOCIABLE, TACITURN]
443
444
445class CustomLevelsAndFiltersTest(BaseTest):
446
447    """Test various filtering possibilities with custom logging levels."""
448
449    # Skip the logger name group.
450    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
451
452    def setUp(self):
453        BaseTest.setUp(self)
454        for k, v in my_logging_levels.items():
455            logging.addLevelName(k, v)
456
457    def log_at_all_levels(self, logger):
458        for lvl in LEVEL_RANGE:
459            logger.log(lvl, self.next_message())
460
461    def test_logger_filter(self):
462        # Filter at logger level.
463        self.root_logger.setLevel(VERBOSE)
464        # Levels >= 'Verbose' are good.
465        self.log_at_all_levels(self.root_logger)
466        self.assert_log_lines([
467            ('Verbose', '5'),
468            ('Sociable', '6'),
469            ('Effusive', '7'),
470            ('Terse', '8'),
471            ('Taciturn', '9'),
472            ('Silent', '10'),
473        ])
474
475    def test_handler_filter(self):
476        # Filter at handler level.
477        self.root_logger.handlers[0].setLevel(SOCIABLE)
478        try:
479            # Levels >= 'Sociable' are good.
480            self.log_at_all_levels(self.root_logger)
481            self.assert_log_lines([
482                ('Sociable', '6'),
483                ('Effusive', '7'),
484                ('Terse', '8'),
485                ('Taciturn', '9'),
486                ('Silent', '10'),
487            ])
488        finally:
489            self.root_logger.handlers[0].setLevel(logging.NOTSET)
490
491    def test_specific_filters(self):
492        # Set a specific filter object on the handler, and then add another
493        #  filter object on the logger itself.
494        handler = self.root_logger.handlers[0]
495        specific_filter = None
496        garr = GarrulousFilter()
497        handler.addFilter(garr)
498        try:
499            self.log_at_all_levels(self.root_logger)
500            first_lines = [
501                # Notice how 'Garrulous' is missing
502                ('Boring', '1'),
503                ('Chatterbox', '2'),
504                ('Talkative', '4'),
505                ('Verbose', '5'),
506                ('Sociable', '6'),
507                ('Effusive', '7'),
508                ('Terse', '8'),
509                ('Taciturn', '9'),
510                ('Silent', '10'),
511            ]
512            self.assert_log_lines(first_lines)
513
514            specific_filter = VerySpecificFilter()
515            self.root_logger.addFilter(specific_filter)
516            self.log_at_all_levels(self.root_logger)
517            self.assert_log_lines(first_lines + [
518                # Not only 'Garrulous' is still missing, but also 'Sociable'
519                # and 'Taciturn'
520                ('Boring', '11'),
521                ('Chatterbox', '12'),
522                ('Talkative', '14'),
523                ('Verbose', '15'),
524                ('Effusive', '17'),
525                ('Terse', '18'),
526                ('Silent', '20'),
527        ])
528        finally:
529            if specific_filter:
530                self.root_logger.removeFilter(specific_filter)
531            handler.removeFilter(garr)
532
533
534class HandlerTest(BaseTest):
535    def test_name(self):
536        h = logging.Handler()
537        h.name = 'generic'
538        self.assertEqual(h.name, 'generic')
539        h.name = 'anothergeneric'
540        self.assertEqual(h.name, 'anothergeneric')
541        self.assertRaises(NotImplementedError, h.emit, None)
542
543    def test_builtin_handlers(self):
544        # We can't actually *use* too many handlers in the tests,
545        # but we can try instantiating them with various options
546        if sys.platform in ('linux', 'darwin'):
547            for existing in (True, False):
548                fd, fn = tempfile.mkstemp()
549                os.close(fd)
550                if not existing:
551                    os.unlink(fn)
552                h = logging.handlers.WatchedFileHandler(fn, delay=True)
553                if existing:
554                    dev, ino = h.dev, h.ino
555                    self.assertEqual(dev, -1)
556                    self.assertEqual(ino, -1)
557                    r = logging.makeLogRecord({'msg': 'Test'})
558                    h.handle(r)
559                    # Now remove the file.
560                    os.unlink(fn)
561                    self.assertFalse(os.path.exists(fn))
562                    # The next call should recreate the file.
563                    h.handle(r)
564                    self.assertTrue(os.path.exists(fn))
565                else:
566                    self.assertEqual(h.dev, -1)
567                    self.assertEqual(h.ino, -1)
568                h.close()
569                if existing:
570                    os.unlink(fn)
571            if sys.platform == 'darwin':
572                sockname = '/var/run/syslog'
573            else:
574                sockname = '/dev/log'
575            try:
576                h = logging.handlers.SysLogHandler(sockname)
577                self.assertEqual(h.facility, h.LOG_USER)
578                self.assertTrue(h.unixsocket)
579                h.close()
580            except OSError: # syslogd might not be available
581                pass
582        for method in ('GET', 'POST', 'PUT'):
583            if method == 'PUT':
584                self.assertRaises(ValueError, logging.handlers.HTTPHandler,
585                                  'localhost', '/log', method)
586            else:
587                h = logging.handlers.HTTPHandler('localhost', '/log', method)
588                h.close()
589        h = logging.handlers.BufferingHandler(0)
590        r = logging.makeLogRecord({})
591        self.assertTrue(h.shouldFlush(r))
592        h.close()
593        h = logging.handlers.BufferingHandler(1)
594        self.assertFalse(h.shouldFlush(r))
595        h.close()
596
597    def test_path_objects(self):
598        """
599        Test that Path objects are accepted as filename arguments to handlers.
600
601        See Issue #27493.
602        """
603        fd, fn = tempfile.mkstemp()
604        os.close(fd)
605        os.unlink(fn)
606        pfn = pathlib.Path(fn)
607        cases = (
608                    (logging.FileHandler, (pfn, 'w')),
609                    (logging.handlers.RotatingFileHandler, (pfn, 'a')),
610                    (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')),
611                )
612        if sys.platform in ('linux', 'darwin'):
613            cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
614        for cls, args in cases:
615            h = cls(*args)
616            self.assertTrue(os.path.exists(fn))
617            h.close()
618            os.unlink(fn)
619
620    @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
621    def test_race(self):
622        # Issue #14632 refers.
623        def remove_loop(fname, tries):
624            for _ in range(tries):
625                try:
626                    os.unlink(fname)
627                    self.deletion_time = time.time()
628                except OSError:
629                    pass
630                time.sleep(0.004 * random.randint(0, 4))
631
632        del_count = 500
633        log_count = 500
634
635        self.handle_time = None
636        self.deletion_time = None
637
638        for delay in (False, True):
639            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
640            os.close(fd)
641            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
642            remover.daemon = True
643            remover.start()
644            h = logging.handlers.WatchedFileHandler(fn, delay=delay)
645            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
646            h.setFormatter(f)
647            try:
648                for _ in range(log_count):
649                    time.sleep(0.005)
650                    r = logging.makeLogRecord({'msg': 'testing' })
651                    try:
652                        self.handle_time = time.time()
653                        h.handle(r)
654                    except Exception:
655                        print('Deleted at %s, '
656                              'opened at %s' % (self.deletion_time,
657                                                self.handle_time))
658                        raise
659            finally:
660                remover.join()
661                h.close()
662                if os.path.exists(fn):
663                    os.unlink(fn)
664
665    # The implementation relies on os.register_at_fork existing, but we test
666    # based on os.fork existing because that is what users and this test use.
667    # This helps ensure that when fork exists (the important concept) that the
668    # register_at_fork mechanism is also present and used.
669    @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().')
670    def test_post_fork_child_no_deadlock(self):
671        """Ensure child logging locks are not held; bpo-6721 & bpo-36533."""
672        class _OurHandler(logging.Handler):
673            def __init__(self):
674                super().__init__()
675                self.sub_handler = logging.StreamHandler(
676                    stream=open('/dev/null', 'wt'))
677
678            def emit(self, record):
679                self.sub_handler.acquire()
680                try:
681                    self.sub_handler.emit(record)
682                finally:
683                    self.sub_handler.release()
684
685        self.assertEqual(len(logging._handlers), 0)
686        refed_h = _OurHandler()
687        self.addCleanup(refed_h.sub_handler.stream.close)
688        refed_h.name = 'because we need at least one for this test'
689        self.assertGreater(len(logging._handlers), 0)
690        self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1)
691        test_logger = logging.getLogger('test_post_fork_child_no_deadlock')
692        test_logger.addHandler(refed_h)
693        test_logger.setLevel(logging.DEBUG)
694
695        locks_held__ready_to_fork = threading.Event()
696        fork_happened__release_locks_and_end_thread = threading.Event()
697
698        def lock_holder_thread_fn():
699            logging._acquireLock()
700            try:
701                refed_h.acquire()
702                try:
703                    # Tell the main thread to do the fork.
704                    locks_held__ready_to_fork.set()
705
706                    # If the deadlock bug exists, the fork will happen
707                    # without dealing with the locks we hold, deadlocking
708                    # the child.
709
710                    # Wait for a successful fork or an unreasonable amount of
711                    # time before releasing our locks.  To avoid a timing based
712                    # test we'd need communication from os.fork() as to when it
713                    # has actually happened.  Given this is a regression test
714                    # for a fixed issue, potentially less reliably detecting
715                    # regression via timing is acceptable for simplicity.
716                    # The test will always take at least this long. :(
717                    fork_happened__release_locks_and_end_thread.wait(0.5)
718                finally:
719                    refed_h.release()
720            finally:
721                logging._releaseLock()
722
723        lock_holder_thread = threading.Thread(
724                target=lock_holder_thread_fn,
725                name='test_post_fork_child_no_deadlock lock holder')
726        lock_holder_thread.start()
727
728        locks_held__ready_to_fork.wait()
729        pid = os.fork()
730        if pid == 0:  # Child.
731            try:
732                test_logger.info(r'Child process did not deadlock. \o/')
733            finally:
734                os._exit(0)
735        else:  # Parent.
736            test_logger.info(r'Parent process returned from fork. \o/')
737            fork_happened__release_locks_and_end_thread.set()
738            lock_holder_thread.join()
739            start_time = time.monotonic()
740            while True:
741                test_logger.debug('Waiting for child process.')
742                waited_pid, status = os.waitpid(pid, os.WNOHANG)
743                if waited_pid == pid:
744                    break  # child process exited.
745                if time.monotonic() - start_time > 7:
746                    break  # so long? implies child deadlock.
747                time.sleep(0.05)
748            test_logger.debug('Done waiting.')
749            if waited_pid != pid:
750                os.kill(pid, signal.SIGKILL)
751                waited_pid, status = os.waitpid(pid, 0)
752                self.fail("child process deadlocked.")
753            self.assertEqual(status, 0, msg="child process error")
754
755
756class BadStream(object):
757    def write(self, data):
758        raise RuntimeError('deliberate mistake')
759
760class TestStreamHandler(logging.StreamHandler):
761    def handleError(self, record):
762        self.error_record = record
763
764class StreamWithIntName(object):
765    level = logging.NOTSET
766    name = 2
767
768class StreamHandlerTest(BaseTest):
769    def test_error_handling(self):
770        h = TestStreamHandler(BadStream())
771        r = logging.makeLogRecord({})
772        old_raise = logging.raiseExceptions
773
774        try:
775            h.handle(r)
776            self.assertIs(h.error_record, r)
777
778            h = logging.StreamHandler(BadStream())
779            with support.captured_stderr() as stderr:
780                h.handle(r)
781                msg = '\nRuntimeError: deliberate mistake\n'
782                self.assertIn(msg, stderr.getvalue())
783
784            logging.raiseExceptions = False
785            with support.captured_stderr() as stderr:
786                h.handle(r)
787                self.assertEqual('', stderr.getvalue())
788        finally:
789            logging.raiseExceptions = old_raise
790
791    def test_stream_setting(self):
792        """
793        Test setting the handler's stream
794        """
795        h = logging.StreamHandler()
796        stream = io.StringIO()
797        old = h.setStream(stream)
798        self.assertIs(old, sys.stderr)
799        actual = h.setStream(old)
800        self.assertIs(actual, stream)
801        # test that setting to existing value returns None
802        actual = h.setStream(old)
803        self.assertIsNone(actual)
804
805    def test_can_represent_stream_with_int_name(self):
806        h = logging.StreamHandler(StreamWithIntName())
807        self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>')
808
809# -- The following section could be moved into a server_helper.py module
810# -- if it proves to be of wider utility than just test_logging
811
812class TestSMTPServer(smtpd.SMTPServer):
813    """
814    This class implements a test SMTP server.
815
816    :param addr: A (host, port) tuple which the server listens on.
817                 You can specify a port value of zero: the server's
818                 *port* attribute will hold the actual port number
819                 used, which can be used in client connections.
820    :param handler: A callable which will be called to process
821                    incoming messages. The handler will be passed
822                    the client address tuple, who the message is from,
823                    a list of recipients and the message data.
824    :param poll_interval: The interval, in seconds, used in the underlying
825                          :func:`select` or :func:`poll` call by
826                          :func:`asyncore.loop`.
827    :param sockmap: A dictionary which will be used to hold
828                    :class:`asyncore.dispatcher` instances used by
829                    :func:`asyncore.loop`. This avoids changing the
830                    :mod:`asyncore` module's global state.
831    """
832
833    def __init__(self, addr, handler, poll_interval, sockmap):
834        smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
835                                  decode_data=True)
836        self.port = self.socket.getsockname()[1]
837        self._handler = handler
838        self._thread = None
839        self.poll_interval = poll_interval
840
841    def process_message(self, peer, mailfrom, rcpttos, data):
842        """
843        Delegates to the handler passed in to the server's constructor.
844
845        Typically, this will be a test case method.
846        :param peer: The client (host, port) tuple.
847        :param mailfrom: The address of the sender.
848        :param rcpttos: The addresses of the recipients.
849        :param data: The message.
850        """
851        self._handler(peer, mailfrom, rcpttos, data)
852
853    def start(self):
854        """
855        Start the server running on a separate daemon thread.
856        """
857        self._thread = t = threading.Thread(target=self.serve_forever,
858                                            args=(self.poll_interval,))
859        t.setDaemon(True)
860        t.start()
861
862    def serve_forever(self, poll_interval):
863        """
864        Run the :mod:`asyncore` loop until normal termination
865        conditions arise.
866        :param poll_interval: The interval, in seconds, used in the underlying
867                              :func:`select` or :func:`poll` call by
868                              :func:`asyncore.loop`.
869        """
870        asyncore.loop(poll_interval, map=self._map)
871
872    def stop(self, timeout=None):
873        """
874        Stop the thread by closing the server instance.
875        Wait for the server thread to terminate.
876
877        :param timeout: How long to wait for the server thread
878                        to terminate.
879        """
880        self.close()
881        support.join_thread(self._thread, timeout)
882        self._thread = None
883        asyncore.close_all(map=self._map, ignore_all=True)
884
885
886class ControlMixin(object):
887    """
888    This mixin is used to start a server on a separate thread, and
889    shut it down programmatically. Request handling is simplified - instead
890    of needing to derive a suitable RequestHandler subclass, you just
891    provide a callable which will be passed each received request to be
892    processed.
893
894    :param handler: A handler callable which will be called with a
895                    single parameter - the request - in order to
896                    process the request. This handler is called on the
897                    server thread, effectively meaning that requests are
898                    processed serially. While not quite Web scale ;-),
899                    this should be fine for testing applications.
900    :param poll_interval: The polling interval in seconds.
901    """
902    def __init__(self, handler, poll_interval):
903        self._thread = None
904        self.poll_interval = poll_interval
905        self._handler = handler
906        self.ready = threading.Event()
907
908    def start(self):
909        """
910        Create a daemon thread to run the server, and start it.
911        """
912        self._thread = t = threading.Thread(target=self.serve_forever,
913                                            args=(self.poll_interval,))
914        t.setDaemon(True)
915        t.start()
916
917    def serve_forever(self, poll_interval):
918        """
919        Run the server. Set the ready flag before entering the
920        service loop.
921        """
922        self.ready.set()
923        super(ControlMixin, self).serve_forever(poll_interval)
924
925    def stop(self, timeout=None):
926        """
927        Tell the server thread to stop, and wait for it to do so.
928
929        :param timeout: How long to wait for the server thread
930                        to terminate.
931        """
932        self.shutdown()
933        if self._thread is not None:
934            support.join_thread(self._thread, timeout)
935            self._thread = None
936        self.server_close()
937        self.ready.clear()
938
939class TestHTTPServer(ControlMixin, HTTPServer):
940    """
941    An HTTP server which is controllable using :class:`ControlMixin`.
942
943    :param addr: A tuple with the IP address and port to listen on.
944    :param handler: A handler callable which will be called with a
945                    single parameter - the request - in order to
946                    process the request.
947    :param poll_interval: The polling interval in seconds.
948    :param log: Pass ``True`` to enable log messages.
949    """
950    def __init__(self, addr, handler, poll_interval=0.5,
951                 log=False, sslctx=None):
952        class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
953            def __getattr__(self, name, default=None):
954                if name.startswith('do_'):
955                    return self.process_request
956                raise AttributeError(name)
957
958            def process_request(self):
959                self.server._handler(self)
960
961            def log_message(self, format, *args):
962                if log:
963                    super(DelegatingHTTPRequestHandler,
964                          self).log_message(format, *args)
965        HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
966        ControlMixin.__init__(self, handler, poll_interval)
967        self.sslctx = sslctx
968
969    def get_request(self):
970        try:
971            sock, addr = self.socket.accept()
972            if self.sslctx:
973                sock = self.sslctx.wrap_socket(sock, server_side=True)
974        except OSError as e:
975            # socket errors are silenced by the caller, print them here
976            sys.stderr.write("Got an error:\n%s\n" % e)
977            raise
978        return sock, addr
979
980class TestTCPServer(ControlMixin, ThreadingTCPServer):
981    """
982    A TCP server which is controllable using :class:`ControlMixin`.
983
984    :param addr: A tuple with the IP address and port to listen on.
985    :param handler: A handler callable which will be called with a single
986                    parameter - the request - in order to process the request.
987    :param poll_interval: The polling interval in seconds.
988    :bind_and_activate: If True (the default), binds the server and starts it
989                        listening. If False, you need to call
990                        :meth:`server_bind` and :meth:`server_activate` at
991                        some later time before calling :meth:`start`, so that
992                        the server will set up the socket and listen on it.
993    """
994
995    allow_reuse_address = True
996
997    def __init__(self, addr, handler, poll_interval=0.5,
998                 bind_and_activate=True):
999        class DelegatingTCPRequestHandler(StreamRequestHandler):
1000
1001            def handle(self):
1002                self.server._handler(self)
1003        ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
1004                                    bind_and_activate)
1005        ControlMixin.__init__(self, handler, poll_interval)
1006
1007    def server_bind(self):
1008        super(TestTCPServer, self).server_bind()
1009        self.port = self.socket.getsockname()[1]
1010
1011class TestUDPServer(ControlMixin, ThreadingUDPServer):
1012    """
1013    A UDP server which is controllable using :class:`ControlMixin`.
1014
1015    :param addr: A tuple with the IP address and port to listen on.
1016    :param handler: A handler callable which will be called with a
1017                    single parameter - the request - in order to
1018                    process the request.
1019    :param poll_interval: The polling interval for shutdown requests,
1020                          in seconds.
1021    :bind_and_activate: If True (the default), binds the server and
1022                        starts it listening. If False, you need to
1023                        call :meth:`server_bind` and
1024                        :meth:`server_activate` at some later time
1025                        before calling :meth:`start`, so that the server will
1026                        set up the socket and listen on it.
1027    """
1028    def __init__(self, addr, handler, poll_interval=0.5,
1029                 bind_and_activate=True):
1030        class DelegatingUDPRequestHandler(DatagramRequestHandler):
1031
1032            def handle(self):
1033                self.server._handler(self)
1034
1035            def finish(self):
1036                data = self.wfile.getvalue()
1037                if data:
1038                    try:
1039                        super(DelegatingUDPRequestHandler, self).finish()
1040                    except OSError:
1041                        if not self.server._closed:
1042                            raise
1043
1044        ThreadingUDPServer.__init__(self, addr,
1045                                    DelegatingUDPRequestHandler,
1046                                    bind_and_activate)
1047        ControlMixin.__init__(self, handler, poll_interval)
1048        self._closed = False
1049
1050    def server_bind(self):
1051        super(TestUDPServer, self).server_bind()
1052        self.port = self.socket.getsockname()[1]
1053
1054    def server_close(self):
1055        super(TestUDPServer, self).server_close()
1056        self._closed = True
1057
1058if hasattr(socket, "AF_UNIX"):
1059    class TestUnixStreamServer(TestTCPServer):
1060        address_family = socket.AF_UNIX
1061
1062    class TestUnixDatagramServer(TestUDPServer):
1063        address_family = socket.AF_UNIX
1064
1065# - end of server_helper section
1066
1067class SMTPHandlerTest(BaseTest):
1068    # bpo-14314, bpo-19665, bpo-34092: don't wait forever, timeout of 1 minute
1069    TIMEOUT = 60.0
1070
1071    def test_basic(self):
1072        sockmap = {}
1073        server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001,
1074                                sockmap)
1075        server.start()
1076        addr = (support.HOST, server.port)
1077        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
1078                                         timeout=self.TIMEOUT)
1079        self.assertEqual(h.toaddrs, ['you'])
1080        self.messages = []
1081        r = logging.makeLogRecord({'msg': 'Hello \u2713'})
1082        self.handled = threading.Event()
1083        h.handle(r)
1084        self.handled.wait(self.TIMEOUT)
1085        server.stop()
1086        self.assertTrue(self.handled.is_set())
1087        self.assertEqual(len(self.messages), 1)
1088        peer, mailfrom, rcpttos, data = self.messages[0]
1089        self.assertEqual(mailfrom, 'me')
1090        self.assertEqual(rcpttos, ['you'])
1091        self.assertIn('\nSubject: Log\n', data)
1092        self.assertTrue(data.endswith('\n\nHello \u2713'))
1093        h.close()
1094
1095    def process_message(self, *args):
1096        self.messages.append(args)
1097        self.handled.set()
1098
1099class MemoryHandlerTest(BaseTest):
1100
1101    """Tests for the MemoryHandler."""
1102
1103    # Do not bother with a logger name group.
1104    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
1105
1106    def setUp(self):
1107        BaseTest.setUp(self)
1108        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1109                                                       self.root_hdlr)
1110        self.mem_logger = logging.getLogger('mem')
1111        self.mem_logger.propagate = 0
1112        self.mem_logger.addHandler(self.mem_hdlr)
1113
1114    def tearDown(self):
1115        self.mem_hdlr.close()
1116        BaseTest.tearDown(self)
1117
1118    def test_flush(self):
1119        # The memory handler flushes to its target handler based on specific
1120        #  criteria (message count and message level).
1121        self.mem_logger.debug(self.next_message())
1122        self.assert_log_lines([])
1123        self.mem_logger.info(self.next_message())
1124        self.assert_log_lines([])
1125        # This will flush because the level is >= logging.WARNING
1126        self.mem_logger.warning(self.next_message())
1127        lines = [
1128            ('DEBUG', '1'),
1129            ('INFO', '2'),
1130            ('WARNING', '3'),
1131        ]
1132        self.assert_log_lines(lines)
1133        for n in (4, 14):
1134            for i in range(9):
1135                self.mem_logger.debug(self.next_message())
1136            self.assert_log_lines(lines)
1137            # This will flush because it's the 10th message since the last
1138            #  flush.
1139            self.mem_logger.debug(self.next_message())
1140            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
1141            self.assert_log_lines(lines)
1142
1143        self.mem_logger.debug(self.next_message())
1144        self.assert_log_lines(lines)
1145
1146    def test_flush_on_close(self):
1147        """
1148        Test that the flush-on-close configuration works as expected.
1149        """
1150        self.mem_logger.debug(self.next_message())
1151        self.assert_log_lines([])
1152        self.mem_logger.info(self.next_message())
1153        self.assert_log_lines([])
1154        self.mem_logger.removeHandler(self.mem_hdlr)
1155        # Default behaviour is to flush on close. Check that it happens.
1156        self.mem_hdlr.close()
1157        lines = [
1158            ('DEBUG', '1'),
1159            ('INFO', '2'),
1160        ]
1161        self.assert_log_lines(lines)
1162        # Now configure for flushing not to be done on close.
1163        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1164                                                       self.root_hdlr,
1165                                                       False)
1166        self.mem_logger.addHandler(self.mem_hdlr)
1167        self.mem_logger.debug(self.next_message())
1168        self.assert_log_lines(lines)  # no change
1169        self.mem_logger.info(self.next_message())
1170        self.assert_log_lines(lines)  # no change
1171        self.mem_logger.removeHandler(self.mem_hdlr)
1172        self.mem_hdlr.close()
1173        # assert that no new lines have been added
1174        self.assert_log_lines(lines)  # no change
1175
1176
1177class ExceptionFormatter(logging.Formatter):
1178    """A special exception formatter."""
1179    def formatException(self, ei):
1180        return "Got a [%s]" % ei[0].__name__
1181
1182
1183class ConfigFileTest(BaseTest):
1184
1185    """Reading logging config from a .ini-style config file."""
1186
1187    check_no_resource_warning = support.check_no_resource_warning
1188    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1189
1190    # config0 is a standard configuration.
1191    config0 = """
1192    [loggers]
1193    keys=root
1194
1195    [handlers]
1196    keys=hand1
1197
1198    [formatters]
1199    keys=form1
1200
1201    [logger_root]
1202    level=WARNING
1203    handlers=hand1
1204
1205    [handler_hand1]
1206    class=StreamHandler
1207    level=NOTSET
1208    formatter=form1
1209    args=(sys.stdout,)
1210
1211    [formatter_form1]
1212    format=%(levelname)s ++ %(message)s
1213    datefmt=
1214    """
1215
1216    # config1 adds a little to the standard configuration.
1217    config1 = """
1218    [loggers]
1219    keys=root,parser
1220
1221    [handlers]
1222    keys=hand1
1223
1224    [formatters]
1225    keys=form1
1226
1227    [logger_root]
1228    level=WARNING
1229    handlers=
1230
1231    [logger_parser]
1232    level=DEBUG
1233    handlers=hand1
1234    propagate=1
1235    qualname=compiler.parser
1236
1237    [handler_hand1]
1238    class=StreamHandler
1239    level=NOTSET
1240    formatter=form1
1241    args=(sys.stdout,)
1242
1243    [formatter_form1]
1244    format=%(levelname)s ++ %(message)s
1245    datefmt=
1246    """
1247
1248    # config1a moves the handler to the root.
1249    config1a = """
1250    [loggers]
1251    keys=root,parser
1252
1253    [handlers]
1254    keys=hand1
1255
1256    [formatters]
1257    keys=form1
1258
1259    [logger_root]
1260    level=WARNING
1261    handlers=hand1
1262
1263    [logger_parser]
1264    level=DEBUG
1265    handlers=
1266    propagate=1
1267    qualname=compiler.parser
1268
1269    [handler_hand1]
1270    class=StreamHandler
1271    level=NOTSET
1272    formatter=form1
1273    args=(sys.stdout,)
1274
1275    [formatter_form1]
1276    format=%(levelname)s ++ %(message)s
1277    datefmt=
1278    """
1279
1280    # config2 has a subtle configuration error that should be reported
1281    config2 = config1.replace("sys.stdout", "sys.stbout")
1282
1283    # config3 has a less subtle configuration error
1284    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
1285
1286    # config4 specifies a custom formatter class to be loaded
1287    config4 = """
1288    [loggers]
1289    keys=root
1290
1291    [handlers]
1292    keys=hand1
1293
1294    [formatters]
1295    keys=form1
1296
1297    [logger_root]
1298    level=NOTSET
1299    handlers=hand1
1300
1301    [handler_hand1]
1302    class=StreamHandler
1303    level=NOTSET
1304    formatter=form1
1305    args=(sys.stdout,)
1306
1307    [formatter_form1]
1308    class=""" + __name__ + """.ExceptionFormatter
1309    format=%(levelname)s:%(name)s:%(message)s
1310    datefmt=
1311    """
1312
1313    # config5 specifies a custom handler class to be loaded
1314    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
1315
1316    # config6 uses ', ' delimiters in the handlers and formatters sections
1317    config6 = """
1318    [loggers]
1319    keys=root,parser
1320
1321    [handlers]
1322    keys=hand1, hand2
1323
1324    [formatters]
1325    keys=form1, form2
1326
1327    [logger_root]
1328    level=WARNING
1329    handlers=
1330
1331    [logger_parser]
1332    level=DEBUG
1333    handlers=hand1
1334    propagate=1
1335    qualname=compiler.parser
1336
1337    [handler_hand1]
1338    class=StreamHandler
1339    level=NOTSET
1340    formatter=form1
1341    args=(sys.stdout,)
1342
1343    [handler_hand2]
1344    class=StreamHandler
1345    level=NOTSET
1346    formatter=form1
1347    args=(sys.stderr,)
1348
1349    [formatter_form1]
1350    format=%(levelname)s ++ %(message)s
1351    datefmt=
1352
1353    [formatter_form2]
1354    format=%(message)s
1355    datefmt=
1356    """
1357
1358    # config7 adds a compiler logger, and uses kwargs instead of args.
1359    config7 = """
1360    [loggers]
1361    keys=root,parser,compiler
1362
1363    [handlers]
1364    keys=hand1
1365
1366    [formatters]
1367    keys=form1
1368
1369    [logger_root]
1370    level=WARNING
1371    handlers=hand1
1372
1373    [logger_compiler]
1374    level=DEBUG
1375    handlers=
1376    propagate=1
1377    qualname=compiler
1378
1379    [logger_parser]
1380    level=DEBUG
1381    handlers=
1382    propagate=1
1383    qualname=compiler.parser
1384
1385    [handler_hand1]
1386    class=StreamHandler
1387    level=NOTSET
1388    formatter=form1
1389    kwargs={'stream': sys.stdout,}
1390
1391    [formatter_form1]
1392    format=%(levelname)s ++ %(message)s
1393    datefmt=
1394    """
1395
1396    # config 8, check for resource warning
1397    config8 = r"""
1398    [loggers]
1399    keys=root
1400
1401    [handlers]
1402    keys=file
1403
1404    [formatters]
1405    keys=
1406
1407    [logger_root]
1408    level=DEBUG
1409    handlers=file
1410
1411    [handler_file]
1412    class=FileHandler
1413    level=DEBUG
1414    args=("{tempfile}",)
1415    """
1416
1417    disable_test = """
1418    [loggers]
1419    keys=root
1420
1421    [handlers]
1422    keys=screen
1423
1424    [formatters]
1425    keys=
1426
1427    [logger_root]
1428    level=DEBUG
1429    handlers=screen
1430
1431    [handler_screen]
1432    level=DEBUG
1433    class=StreamHandler
1434    args=(sys.stdout,)
1435    formatter=
1436    """
1437
1438    def apply_config(self, conf, **kwargs):
1439        file = io.StringIO(textwrap.dedent(conf))
1440        logging.config.fileConfig(file, **kwargs)
1441
1442    def test_config0_ok(self):
1443        # A simple config file which overrides the default settings.
1444        with support.captured_stdout() as output:
1445            self.apply_config(self.config0)
1446            logger = logging.getLogger()
1447            # Won't output anything
1448            logger.info(self.next_message())
1449            # Outputs a message
1450            logger.error(self.next_message())
1451            self.assert_log_lines([
1452                ('ERROR', '2'),
1453            ], stream=output)
1454            # Original logger output is empty.
1455            self.assert_log_lines([])
1456
1457    def test_config0_using_cp_ok(self):
1458        # A simple config file which overrides the default settings.
1459        with support.captured_stdout() as output:
1460            file = io.StringIO(textwrap.dedent(self.config0))
1461            cp = configparser.ConfigParser()
1462            cp.read_file(file)
1463            logging.config.fileConfig(cp)
1464            logger = logging.getLogger()
1465            # Won't output anything
1466            logger.info(self.next_message())
1467            # Outputs a message
1468            logger.error(self.next_message())
1469            self.assert_log_lines([
1470                ('ERROR', '2'),
1471            ], stream=output)
1472            # Original logger output is empty.
1473            self.assert_log_lines([])
1474
1475    def test_config1_ok(self, config=config1):
1476        # A config file defining a sub-parser as well.
1477        with support.captured_stdout() as output:
1478            self.apply_config(config)
1479            logger = logging.getLogger("compiler.parser")
1480            # Both will output a message
1481            logger.info(self.next_message())
1482            logger.error(self.next_message())
1483            self.assert_log_lines([
1484                ('INFO', '1'),
1485                ('ERROR', '2'),
1486            ], stream=output)
1487            # Original logger output is empty.
1488            self.assert_log_lines([])
1489
1490    def test_config2_failure(self):
1491        # A simple config file which overrides the default settings.
1492        self.assertRaises(Exception, self.apply_config, self.config2)
1493
1494    def test_config3_failure(self):
1495        # A simple config file which overrides the default settings.
1496        self.assertRaises(Exception, self.apply_config, self.config3)
1497
1498    def test_config4_ok(self):
1499        # A config file specifying a custom formatter class.
1500        with support.captured_stdout() as output:
1501            self.apply_config(self.config4)
1502            logger = logging.getLogger()
1503            try:
1504                raise RuntimeError()
1505            except RuntimeError:
1506                logging.exception("just testing")
1507            sys.stdout.seek(0)
1508            self.assertEqual(output.getvalue(),
1509                "ERROR:root:just testing\nGot a [RuntimeError]\n")
1510            # Original logger output is empty
1511            self.assert_log_lines([])
1512
1513    def test_config5_ok(self):
1514        self.test_config1_ok(config=self.config5)
1515
1516    def test_config6_ok(self):
1517        self.test_config1_ok(config=self.config6)
1518
1519    def test_config7_ok(self):
1520        with support.captured_stdout() as output:
1521            self.apply_config(self.config1a)
1522            logger = logging.getLogger("compiler.parser")
1523            # See issue #11424. compiler-hyphenated sorts
1524            # between compiler and compiler.xyz and this
1525            # was preventing compiler.xyz from being included
1526            # in the child loggers of compiler because of an
1527            # overzealous loop termination condition.
1528            hyphenated = logging.getLogger('compiler-hyphenated')
1529            # All will output a message
1530            logger.info(self.next_message())
1531            logger.error(self.next_message())
1532            hyphenated.critical(self.next_message())
1533            self.assert_log_lines([
1534                ('INFO', '1'),
1535                ('ERROR', '2'),
1536                ('CRITICAL', '3'),
1537            ], stream=output)
1538            # Original logger output is empty.
1539            self.assert_log_lines([])
1540        with support.captured_stdout() as output:
1541            self.apply_config(self.config7)
1542            logger = logging.getLogger("compiler.parser")
1543            self.assertFalse(logger.disabled)
1544            # Both will output a message
1545            logger.info(self.next_message())
1546            logger.error(self.next_message())
1547            logger = logging.getLogger("compiler.lexer")
1548            # Both will output a message
1549            logger.info(self.next_message())
1550            logger.error(self.next_message())
1551            # Will not appear
1552            hyphenated.critical(self.next_message())
1553            self.assert_log_lines([
1554                ('INFO', '4'),
1555                ('ERROR', '5'),
1556                ('INFO', '6'),
1557                ('ERROR', '7'),
1558            ], stream=output)
1559            # Original logger output is empty.
1560            self.assert_log_lines([])
1561
1562    def test_config8_ok(self):
1563
1564        def cleanup(h1, fn):
1565            h1.close()
1566            os.remove(fn)
1567
1568        with self.check_no_resource_warning():
1569            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
1570            os.close(fd)
1571
1572            # Replace single backslash with double backslash in windows
1573            # to avoid unicode error during string formatting
1574            if os.name == "nt":
1575                fn = fn.replace("\\", "\\\\")
1576
1577            config8 = self.config8.format(tempfile=fn)
1578
1579            self.apply_config(config8)
1580            self.apply_config(config8)
1581
1582        handler = logging.root.handlers[0]
1583        self.addCleanup(cleanup, handler, fn)
1584
1585    def test_logger_disabling(self):
1586        self.apply_config(self.disable_test)
1587        logger = logging.getLogger('some_pristine_logger')
1588        self.assertFalse(logger.disabled)
1589        self.apply_config(self.disable_test)
1590        self.assertTrue(logger.disabled)
1591        self.apply_config(self.disable_test, disable_existing_loggers=False)
1592        self.assertFalse(logger.disabled)
1593
1594    def test_defaults_do_no_interpolation(self):
1595        """bpo-33802 defaults should not get interpolated"""
1596        ini = textwrap.dedent("""
1597            [formatters]
1598            keys=default
1599
1600            [formatter_default]
1601
1602            [handlers]
1603            keys=console
1604
1605            [handler_console]
1606            class=logging.StreamHandler
1607            args=tuple()
1608
1609            [loggers]
1610            keys=root
1611
1612            [logger_root]
1613            formatter=default
1614            handlers=console
1615            """).strip()
1616        fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini')
1617        try:
1618            os.write(fd, ini.encode('ascii'))
1619            os.close(fd)
1620            logging.config.fileConfig(
1621                fn,
1622                defaults=dict(
1623                    version=1,
1624                    disable_existing_loggers=False,
1625                    formatters={
1626                        "generic": {
1627                            "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s",
1628                            "datefmt": "[%Y-%m-%d %H:%M:%S %z]",
1629                            "class": "logging.Formatter"
1630                        },
1631                    },
1632                )
1633            )
1634        finally:
1635            os.unlink(fn)
1636
1637
1638class SocketHandlerTest(BaseTest):
1639
1640    """Test for SocketHandler objects."""
1641
1642    server_class = TestTCPServer
1643    address = ('localhost', 0)
1644
1645    def setUp(self):
1646        """Set up a TCP server to receive log messages, and a SocketHandler
1647        pointing to that server's address and port."""
1648        BaseTest.setUp(self)
1649        # Issue #29177: deal with errors that happen during setup
1650        self.server = self.sock_hdlr = self.server_exception = None
1651        try:
1652            self.server = server = self.server_class(self.address,
1653                                                     self.handle_socket, 0.01)
1654            server.start()
1655            # Uncomment next line to test error recovery in setUp()
1656            # raise OSError('dummy error raised')
1657        except OSError as e:
1658            self.server_exception = e
1659            return
1660        server.ready.wait()
1661        hcls = logging.handlers.SocketHandler
1662        if isinstance(server.server_address, tuple):
1663            self.sock_hdlr = hcls('localhost', server.port)
1664        else:
1665            self.sock_hdlr = hcls(server.server_address, None)
1666        self.log_output = ''
1667        self.root_logger.removeHandler(self.root_logger.handlers[0])
1668        self.root_logger.addHandler(self.sock_hdlr)
1669        self.handled = threading.Semaphore(0)
1670
1671    def tearDown(self):
1672        """Shutdown the TCP server."""
1673        try:
1674            if self.sock_hdlr:
1675                self.root_logger.removeHandler(self.sock_hdlr)
1676                self.sock_hdlr.close()
1677            if self.server:
1678                self.server.stop(2.0)
1679        finally:
1680            BaseTest.tearDown(self)
1681
1682    def handle_socket(self, request):
1683        conn = request.connection
1684        while True:
1685            chunk = conn.recv(4)
1686            if len(chunk) < 4:
1687                break
1688            slen = struct.unpack(">L", chunk)[0]
1689            chunk = conn.recv(slen)
1690            while len(chunk) < slen:
1691                chunk = chunk + conn.recv(slen - len(chunk))
1692            obj = pickle.loads(chunk)
1693            record = logging.makeLogRecord(obj)
1694            self.log_output += record.msg + '\n'
1695            self.handled.release()
1696
1697    def test_output(self):
1698        # The log message sent to the SocketHandler is properly received.
1699        if self.server_exception:
1700            self.skipTest(self.server_exception)
1701        logger = logging.getLogger("tcp")
1702        logger.error("spam")
1703        self.handled.acquire()
1704        logger.debug("eggs")
1705        self.handled.acquire()
1706        self.assertEqual(self.log_output, "spam\neggs\n")
1707
1708    def test_noserver(self):
1709        if self.server_exception:
1710            self.skipTest(self.server_exception)
1711        # Avoid timing-related failures due to SocketHandler's own hard-wired
1712        # one-second timeout on socket.create_connection() (issue #16264).
1713        self.sock_hdlr.retryStart = 2.5
1714        # Kill the server
1715        self.server.stop(2.0)
1716        # The logging call should try to connect, which should fail
1717        try:
1718            raise RuntimeError('Deliberate mistake')
1719        except RuntimeError:
1720            self.root_logger.exception('Never sent')
1721        self.root_logger.error('Never sent, either')
1722        now = time.time()
1723        self.assertGreater(self.sock_hdlr.retryTime, now)
1724        time.sleep(self.sock_hdlr.retryTime - now + 0.001)
1725        self.root_logger.error('Nor this')
1726
1727def _get_temp_domain_socket():
1728    fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock')
1729    os.close(fd)
1730    # just need a name - file can't be present, or we'll get an
1731    # 'address already in use' error.
1732    os.remove(fn)
1733    return fn
1734
1735@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1736class UnixSocketHandlerTest(SocketHandlerTest):
1737
1738    """Test for SocketHandler with unix sockets."""
1739
1740    if hasattr(socket, "AF_UNIX"):
1741        server_class = TestUnixStreamServer
1742
1743    def setUp(self):
1744        # override the definition in the base class
1745        self.address = _get_temp_domain_socket()
1746        SocketHandlerTest.setUp(self)
1747
1748    def tearDown(self):
1749        SocketHandlerTest.tearDown(self)
1750        support.unlink(self.address)
1751
1752class DatagramHandlerTest(BaseTest):
1753
1754    """Test for DatagramHandler."""
1755
1756    server_class = TestUDPServer
1757    address = ('localhost', 0)
1758
1759    def setUp(self):
1760        """Set up a UDP server to receive log messages, and a DatagramHandler
1761        pointing to that server's address and port."""
1762        BaseTest.setUp(self)
1763        # Issue #29177: deal with errors that happen during setup
1764        self.server = self.sock_hdlr = self.server_exception = None
1765        try:
1766            self.server = server = self.server_class(self.address,
1767                                                     self.handle_datagram, 0.01)
1768            server.start()
1769            # Uncomment next line to test error recovery in setUp()
1770            # raise OSError('dummy error raised')
1771        except OSError as e:
1772            self.server_exception = e
1773            return
1774        server.ready.wait()
1775        hcls = logging.handlers.DatagramHandler
1776        if isinstance(server.server_address, tuple):
1777            self.sock_hdlr = hcls('localhost', server.port)
1778        else:
1779            self.sock_hdlr = hcls(server.server_address, None)
1780        self.log_output = ''
1781        self.root_logger.removeHandler(self.root_logger.handlers[0])
1782        self.root_logger.addHandler(self.sock_hdlr)
1783        self.handled = threading.Event()
1784
1785    def tearDown(self):
1786        """Shutdown the UDP server."""
1787        try:
1788            if self.server:
1789                self.server.stop(2.0)
1790            if self.sock_hdlr:
1791                self.root_logger.removeHandler(self.sock_hdlr)
1792                self.sock_hdlr.close()
1793        finally:
1794            BaseTest.tearDown(self)
1795
1796    def handle_datagram(self, request):
1797        slen = struct.pack('>L', 0) # length of prefix
1798        packet = request.packet[len(slen):]
1799        obj = pickle.loads(packet)
1800        record = logging.makeLogRecord(obj)
1801        self.log_output += record.msg + '\n'
1802        self.handled.set()
1803
1804    def test_output(self):
1805        # The log message sent to the DatagramHandler is properly received.
1806        if self.server_exception:
1807            self.skipTest(self.server_exception)
1808        logger = logging.getLogger("udp")
1809        logger.error("spam")
1810        self.handled.wait()
1811        self.handled.clear()
1812        logger.error("eggs")
1813        self.handled.wait()
1814        self.assertEqual(self.log_output, "spam\neggs\n")
1815
1816@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1817class UnixDatagramHandlerTest(DatagramHandlerTest):
1818
1819    """Test for DatagramHandler using Unix sockets."""
1820
1821    if hasattr(socket, "AF_UNIX"):
1822        server_class = TestUnixDatagramServer
1823
1824    def setUp(self):
1825        # override the definition in the base class
1826        self.address = _get_temp_domain_socket()
1827        DatagramHandlerTest.setUp(self)
1828
1829    def tearDown(self):
1830        DatagramHandlerTest.tearDown(self)
1831        support.unlink(self.address)
1832
1833class SysLogHandlerTest(BaseTest):
1834
1835    """Test for SysLogHandler using UDP."""
1836
1837    server_class = TestUDPServer
1838    address = ('localhost', 0)
1839
1840    def setUp(self):
1841        """Set up a UDP server to receive log messages, and a SysLogHandler
1842        pointing to that server's address and port."""
1843        BaseTest.setUp(self)
1844        # Issue #29177: deal with errors that happen during setup
1845        self.server = self.sl_hdlr = self.server_exception = None
1846        try:
1847            self.server = server = self.server_class(self.address,
1848                                                     self.handle_datagram, 0.01)
1849            server.start()
1850            # Uncomment next line to test error recovery in setUp()
1851            # raise OSError('dummy error raised')
1852        except OSError as e:
1853            self.server_exception = e
1854            return
1855        server.ready.wait()
1856        hcls = logging.handlers.SysLogHandler
1857        if isinstance(server.server_address, tuple):
1858            self.sl_hdlr = hcls((server.server_address[0], server.port))
1859        else:
1860            self.sl_hdlr = hcls(server.server_address)
1861        self.log_output = ''
1862        self.root_logger.removeHandler(self.root_logger.handlers[0])
1863        self.root_logger.addHandler(self.sl_hdlr)
1864        self.handled = threading.Event()
1865
1866    def tearDown(self):
1867        """Shutdown the server."""
1868        try:
1869            if self.server:
1870                self.server.stop(2.0)
1871            if self.sl_hdlr:
1872                self.root_logger.removeHandler(self.sl_hdlr)
1873                self.sl_hdlr.close()
1874        finally:
1875            BaseTest.tearDown(self)
1876
1877    def handle_datagram(self, request):
1878        self.log_output = request.packet
1879        self.handled.set()
1880
1881    def test_output(self):
1882        if self.server_exception:
1883            self.skipTest(self.server_exception)
1884        # The log message sent to the SysLogHandler is properly received.
1885        logger = logging.getLogger("slh")
1886        logger.error("sp\xe4m")
1887        self.handled.wait()
1888        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
1889        self.handled.clear()
1890        self.sl_hdlr.append_nul = False
1891        logger.error("sp\xe4m")
1892        self.handled.wait()
1893        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
1894        self.handled.clear()
1895        self.sl_hdlr.ident = "h\xe4m-"
1896        logger.error("sp\xe4m")
1897        self.handled.wait()
1898        self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
1899
1900@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1901class UnixSysLogHandlerTest(SysLogHandlerTest):
1902
1903    """Test for SysLogHandler with Unix sockets."""
1904
1905    if hasattr(socket, "AF_UNIX"):
1906        server_class = TestUnixDatagramServer
1907
1908    def setUp(self):
1909        # override the definition in the base class
1910        self.address = _get_temp_domain_socket()
1911        SysLogHandlerTest.setUp(self)
1912
1913    def tearDown(self):
1914        SysLogHandlerTest.tearDown(self)
1915        support.unlink(self.address)
1916
1917@unittest.skipUnless(support.IPV6_ENABLED,
1918                     'IPv6 support required for this test.')
1919class IPv6SysLogHandlerTest(SysLogHandlerTest):
1920
1921    """Test for SysLogHandler with IPv6 host."""
1922
1923    server_class = TestUDPServer
1924    address = ('::1', 0)
1925
1926    def setUp(self):
1927        self.server_class.address_family = socket.AF_INET6
1928        super(IPv6SysLogHandlerTest, self).setUp()
1929
1930    def tearDown(self):
1931        self.server_class.address_family = socket.AF_INET
1932        super(IPv6SysLogHandlerTest, self).tearDown()
1933
1934class HTTPHandlerTest(BaseTest):
1935    """Test for HTTPHandler."""
1936
1937    def setUp(self):
1938        """Set up an HTTP server to receive log messages, and a HTTPHandler
1939        pointing to that server's address and port."""
1940        BaseTest.setUp(self)
1941        self.handled = threading.Event()
1942
1943    def handle_request(self, request):
1944        self.command = request.command
1945        self.log_data = urlparse(request.path)
1946        if self.command == 'POST':
1947            try:
1948                rlen = int(request.headers['Content-Length'])
1949                self.post_data = request.rfile.read(rlen)
1950            except:
1951                self.post_data = None
1952        request.send_response(200)
1953        request.end_headers()
1954        self.handled.set()
1955
1956    def test_output(self):
1957        # The log message sent to the HTTPHandler is properly received.
1958        logger = logging.getLogger("http")
1959        root_logger = self.root_logger
1960        root_logger.removeHandler(self.root_logger.handlers[0])
1961        for secure in (False, True):
1962            addr = ('localhost', 0)
1963            if secure:
1964                try:
1965                    import ssl
1966                except ImportError:
1967                    sslctx = None
1968                else:
1969                    here = os.path.dirname(__file__)
1970                    localhost_cert = os.path.join(here, "keycert.pem")
1971                    sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1972                    sslctx.load_cert_chain(localhost_cert)
1973
1974                    context = ssl.create_default_context(cafile=localhost_cert)
1975            else:
1976                sslctx = None
1977                context = None
1978            self.server = server = TestHTTPServer(addr, self.handle_request,
1979                                                    0.01, sslctx=sslctx)
1980            server.start()
1981            server.ready.wait()
1982            host = 'localhost:%d' % server.server_port
1983            secure_client = secure and sslctx
1984            self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
1985                                                       secure=secure_client,
1986                                                       context=context,
1987                                                       credentials=('foo', 'bar'))
1988            self.log_data = None
1989            root_logger.addHandler(self.h_hdlr)
1990
1991            for method in ('GET', 'POST'):
1992                self.h_hdlr.method = method
1993                self.handled.clear()
1994                msg = "sp\xe4m"
1995                logger.error(msg)
1996                self.handled.wait()
1997                self.assertEqual(self.log_data.path, '/frob')
1998                self.assertEqual(self.command, method)
1999                if method == 'GET':
2000                    d = parse_qs(self.log_data.query)
2001                else:
2002                    d = parse_qs(self.post_data.decode('utf-8'))
2003                self.assertEqual(d['name'], ['http'])
2004                self.assertEqual(d['funcName'], ['test_output'])
2005                self.assertEqual(d['msg'], [msg])
2006
2007            self.server.stop(2.0)
2008            self.root_logger.removeHandler(self.h_hdlr)
2009            self.h_hdlr.close()
2010
2011class MemoryTest(BaseTest):
2012
2013    """Test memory persistence of logger objects."""
2014
2015    def setUp(self):
2016        """Create a dict to remember potentially destroyed objects."""
2017        BaseTest.setUp(self)
2018        self._survivors = {}
2019
2020    def _watch_for_survival(self, *args):
2021        """Watch the given objects for survival, by creating weakrefs to
2022        them."""
2023        for obj in args:
2024            key = id(obj), repr(obj)
2025            self._survivors[key] = weakref.ref(obj)
2026
2027    def _assertTruesurvival(self):
2028        """Assert that all objects watched for survival have survived."""
2029        # Trigger cycle breaking.
2030        gc.collect()
2031        dead = []
2032        for (id_, repr_), ref in self._survivors.items():
2033            if ref() is None:
2034                dead.append(repr_)
2035        if dead:
2036            self.fail("%d objects should have survived "
2037                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
2038
2039    def test_persistent_loggers(self):
2040        # Logger objects are persistent and retain their configuration, even
2041        #  if visible references are destroyed.
2042        self.root_logger.setLevel(logging.INFO)
2043        foo = logging.getLogger("foo")
2044        self._watch_for_survival(foo)
2045        foo.setLevel(logging.DEBUG)
2046        self.root_logger.debug(self.next_message())
2047        foo.debug(self.next_message())
2048        self.assert_log_lines([
2049            ('foo', 'DEBUG', '2'),
2050        ])
2051        del foo
2052        # foo has survived.
2053        self._assertTruesurvival()
2054        # foo has retained its settings.
2055        bar = logging.getLogger("foo")
2056        bar.debug(self.next_message())
2057        self.assert_log_lines([
2058            ('foo', 'DEBUG', '2'),
2059            ('foo', 'DEBUG', '3'),
2060        ])
2061
2062
2063class EncodingTest(BaseTest):
2064    def test_encoding_plain_file(self):
2065        # In Python 2.x, a plain file object is treated as having no encoding.
2066        log = logging.getLogger("test")
2067        fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
2068        os.close(fd)
2069        # the non-ascii data we write to the log.
2070        data = "foo\x80"
2071        try:
2072            handler = logging.FileHandler(fn, encoding="utf-8")
2073            log.addHandler(handler)
2074            try:
2075                # write non-ascii data to the log.
2076                log.warning(data)
2077            finally:
2078                log.removeHandler(handler)
2079                handler.close()
2080            # check we wrote exactly those bytes, ignoring trailing \n etc
2081            f = open(fn, encoding="utf-8")
2082            try:
2083                self.assertEqual(f.read().rstrip(), data)
2084            finally:
2085                f.close()
2086        finally:
2087            if os.path.isfile(fn):
2088                os.remove(fn)
2089
2090    def test_encoding_cyrillic_unicode(self):
2091        log = logging.getLogger("test")
2092        # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
2093        message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
2094        # Ensure it's written in a Cyrillic encoding
2095        writer_class = codecs.getwriter('cp1251')
2096        writer_class.encoding = 'cp1251'
2097        stream = io.BytesIO()
2098        writer = writer_class(stream, 'strict')
2099        handler = logging.StreamHandler(writer)
2100        log.addHandler(handler)
2101        try:
2102            log.warning(message)
2103        finally:
2104            log.removeHandler(handler)
2105            handler.close()
2106        # check we wrote exactly those bytes, ignoring trailing \n etc
2107        s = stream.getvalue()
2108        # Compare against what the data should be when encoded in CP-1251
2109        self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
2110
2111
2112class WarningsTest(BaseTest):
2113
2114    def test_warnings(self):
2115        with warnings.catch_warnings():
2116            logging.captureWarnings(True)
2117            self.addCleanup(logging.captureWarnings, False)
2118            warnings.filterwarnings("always", category=UserWarning)
2119            stream = io.StringIO()
2120            h = logging.StreamHandler(stream)
2121            logger = logging.getLogger("py.warnings")
2122            logger.addHandler(h)
2123            warnings.warn("I'm warning you...")
2124            logger.removeHandler(h)
2125            s = stream.getvalue()
2126            h.close()
2127            self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
2128
2129            # See if an explicit file uses the original implementation
2130            a_file = io.StringIO()
2131            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
2132                                 a_file, "Dummy line")
2133            s = a_file.getvalue()
2134            a_file.close()
2135            self.assertEqual(s,
2136                "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
2137
2138    def test_warnings_no_handlers(self):
2139        with warnings.catch_warnings():
2140            logging.captureWarnings(True)
2141            self.addCleanup(logging.captureWarnings, False)
2142
2143            # confirm our assumption: no loggers are set
2144            logger = logging.getLogger("py.warnings")
2145            self.assertEqual(logger.handlers, [])
2146
2147            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
2148            self.assertEqual(len(logger.handlers), 1)
2149            self.assertIsInstance(logger.handlers[0], logging.NullHandler)
2150
2151
2152def formatFunc(format, datefmt=None):
2153    return logging.Formatter(format, datefmt)
2154
2155class myCustomFormatter:
2156    def __init__(self, fmt, datefmt=None):
2157        pass
2158
2159def handlerFunc():
2160    return logging.StreamHandler()
2161
2162class CustomHandler(logging.StreamHandler):
2163    pass
2164
2165class ConfigDictTest(BaseTest):
2166
2167    """Reading logging config from a dictionary."""
2168
2169    check_no_resource_warning = support.check_no_resource_warning
2170    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
2171
2172    # config0 is a standard configuration.
2173    config0 = {
2174        'version': 1,
2175        'formatters': {
2176            'form1' : {
2177                'format' : '%(levelname)s ++ %(message)s',
2178            },
2179        },
2180        'handlers' : {
2181            'hand1' : {
2182                'class' : 'logging.StreamHandler',
2183                'formatter' : 'form1',
2184                'level' : 'NOTSET',
2185                'stream'  : 'ext://sys.stdout',
2186            },
2187        },
2188        'root' : {
2189            'level' : 'WARNING',
2190            'handlers' : ['hand1'],
2191        },
2192    }
2193
2194    # config1 adds a little to the standard configuration.
2195    config1 = {
2196        'version': 1,
2197        'formatters': {
2198            'form1' : {
2199                'format' : '%(levelname)s ++ %(message)s',
2200            },
2201        },
2202        'handlers' : {
2203            'hand1' : {
2204                'class' : 'logging.StreamHandler',
2205                'formatter' : 'form1',
2206                'level' : 'NOTSET',
2207                'stream'  : 'ext://sys.stdout',
2208            },
2209        },
2210        'loggers' : {
2211            'compiler.parser' : {
2212                'level' : 'DEBUG',
2213                'handlers' : ['hand1'],
2214            },
2215        },
2216        'root' : {
2217            'level' : 'WARNING',
2218        },
2219    }
2220
2221    # config1a moves the handler to the root. Used with config8a
2222    config1a = {
2223        'version': 1,
2224        'formatters': {
2225            'form1' : {
2226                'format' : '%(levelname)s ++ %(message)s',
2227            },
2228        },
2229        'handlers' : {
2230            'hand1' : {
2231                'class' : 'logging.StreamHandler',
2232                'formatter' : 'form1',
2233                'level' : 'NOTSET',
2234                'stream'  : 'ext://sys.stdout',
2235            },
2236        },
2237        'loggers' : {
2238            'compiler.parser' : {
2239                'level' : 'DEBUG',
2240            },
2241        },
2242        'root' : {
2243            'level' : 'WARNING',
2244            'handlers' : ['hand1'],
2245        },
2246    }
2247
2248    # config2 has a subtle configuration error that should be reported
2249    config2 = {
2250        'version': 1,
2251        'formatters': {
2252            'form1' : {
2253                'format' : '%(levelname)s ++ %(message)s',
2254            },
2255        },
2256        'handlers' : {
2257            'hand1' : {
2258                'class' : 'logging.StreamHandler',
2259                'formatter' : 'form1',
2260                'level' : 'NOTSET',
2261                'stream'  : 'ext://sys.stdbout',
2262            },
2263        },
2264        'loggers' : {
2265            'compiler.parser' : {
2266                'level' : 'DEBUG',
2267                'handlers' : ['hand1'],
2268            },
2269        },
2270        'root' : {
2271            'level' : 'WARNING',
2272        },
2273    }
2274
2275    # As config1 but with a misspelt level on a handler
2276    config2a = {
2277        'version': 1,
2278        'formatters': {
2279            'form1' : {
2280                'format' : '%(levelname)s ++ %(message)s',
2281            },
2282        },
2283        'handlers' : {
2284            'hand1' : {
2285                'class' : 'logging.StreamHandler',
2286                'formatter' : 'form1',
2287                'level' : 'NTOSET',
2288                'stream'  : 'ext://sys.stdout',
2289            },
2290        },
2291        'loggers' : {
2292            'compiler.parser' : {
2293                'level' : 'DEBUG',
2294                'handlers' : ['hand1'],
2295            },
2296        },
2297        'root' : {
2298            'level' : 'WARNING',
2299        },
2300    }
2301
2302
2303    # As config1 but with a misspelt level on a logger
2304    config2b = {
2305        'version': 1,
2306        'formatters': {
2307            'form1' : {
2308                'format' : '%(levelname)s ++ %(message)s',
2309            },
2310        },
2311        'handlers' : {
2312            'hand1' : {
2313                'class' : 'logging.StreamHandler',
2314                'formatter' : 'form1',
2315                'level' : 'NOTSET',
2316                'stream'  : 'ext://sys.stdout',
2317            },
2318        },
2319        'loggers' : {
2320            'compiler.parser' : {
2321                'level' : 'DEBUG',
2322                'handlers' : ['hand1'],
2323            },
2324        },
2325        'root' : {
2326            'level' : 'WRANING',
2327        },
2328    }
2329
2330    # config3 has a less subtle configuration error
2331    config3 = {
2332        'version': 1,
2333        'formatters': {
2334            'form1' : {
2335                'format' : '%(levelname)s ++ %(message)s',
2336            },
2337        },
2338        'handlers' : {
2339            'hand1' : {
2340                'class' : 'logging.StreamHandler',
2341                'formatter' : 'misspelled_name',
2342                'level' : 'NOTSET',
2343                'stream'  : 'ext://sys.stdout',
2344            },
2345        },
2346        'loggers' : {
2347            'compiler.parser' : {
2348                'level' : 'DEBUG',
2349                'handlers' : ['hand1'],
2350            },
2351        },
2352        'root' : {
2353            'level' : 'WARNING',
2354        },
2355    }
2356
2357    # config4 specifies a custom formatter class to be loaded
2358    config4 = {
2359        'version': 1,
2360        'formatters': {
2361            'form1' : {
2362                '()' : __name__ + '.ExceptionFormatter',
2363                'format' : '%(levelname)s:%(name)s:%(message)s',
2364            },
2365        },
2366        'handlers' : {
2367            'hand1' : {
2368                'class' : 'logging.StreamHandler',
2369                'formatter' : 'form1',
2370                'level' : 'NOTSET',
2371                'stream'  : 'ext://sys.stdout',
2372            },
2373        },
2374        'root' : {
2375            'level' : 'NOTSET',
2376                'handlers' : ['hand1'],
2377        },
2378    }
2379
2380    # As config4 but using an actual callable rather than a string
2381    config4a = {
2382        'version': 1,
2383        'formatters': {
2384            'form1' : {
2385                '()' : ExceptionFormatter,
2386                'format' : '%(levelname)s:%(name)s:%(message)s',
2387            },
2388            'form2' : {
2389                '()' : __name__ + '.formatFunc',
2390                'format' : '%(levelname)s:%(name)s:%(message)s',
2391            },
2392            'form3' : {
2393                '()' : formatFunc,
2394                'format' : '%(levelname)s:%(name)s:%(message)s',
2395            },
2396        },
2397        'handlers' : {
2398            'hand1' : {
2399                'class' : 'logging.StreamHandler',
2400                'formatter' : 'form1',
2401                'level' : 'NOTSET',
2402                'stream'  : 'ext://sys.stdout',
2403            },
2404            'hand2' : {
2405                '()' : handlerFunc,
2406            },
2407        },
2408        'root' : {
2409            'level' : 'NOTSET',
2410                'handlers' : ['hand1'],
2411        },
2412    }
2413
2414    # config5 specifies a custom handler class to be loaded
2415    config5 = {
2416        'version': 1,
2417        'formatters': {
2418            'form1' : {
2419                'format' : '%(levelname)s ++ %(message)s',
2420            },
2421        },
2422        'handlers' : {
2423            'hand1' : {
2424                'class' : __name__ + '.CustomHandler',
2425                'formatter' : 'form1',
2426                'level' : 'NOTSET',
2427                'stream'  : 'ext://sys.stdout',
2428            },
2429        },
2430        'loggers' : {
2431            'compiler.parser' : {
2432                'level' : 'DEBUG',
2433                'handlers' : ['hand1'],
2434            },
2435        },
2436        'root' : {
2437            'level' : 'WARNING',
2438        },
2439    }
2440
2441    # config6 specifies a custom handler class to be loaded
2442    # but has bad arguments
2443    config6 = {
2444        'version': 1,
2445        'formatters': {
2446            'form1' : {
2447                'format' : '%(levelname)s ++ %(message)s',
2448            },
2449        },
2450        'handlers' : {
2451            'hand1' : {
2452                'class' : __name__ + '.CustomHandler',
2453                'formatter' : 'form1',
2454                'level' : 'NOTSET',
2455                'stream'  : 'ext://sys.stdout',
2456                '9' : 'invalid parameter name',
2457            },
2458        },
2459        'loggers' : {
2460            'compiler.parser' : {
2461                'level' : 'DEBUG',
2462                'handlers' : ['hand1'],
2463            },
2464        },
2465        'root' : {
2466            'level' : 'WARNING',
2467        },
2468    }
2469
2470    # config 7 does not define compiler.parser but defines compiler.lexer
2471    # so compiler.parser should be disabled after applying it
2472    config7 = {
2473        'version': 1,
2474        'formatters': {
2475            'form1' : {
2476                'format' : '%(levelname)s ++ %(message)s',
2477            },
2478        },
2479        'handlers' : {
2480            'hand1' : {
2481                'class' : 'logging.StreamHandler',
2482                'formatter' : 'form1',
2483                'level' : 'NOTSET',
2484                'stream'  : 'ext://sys.stdout',
2485            },
2486        },
2487        'loggers' : {
2488            'compiler.lexer' : {
2489                'level' : 'DEBUG',
2490                'handlers' : ['hand1'],
2491            },
2492        },
2493        'root' : {
2494            'level' : 'WARNING',
2495        },
2496    }
2497
2498    # config8 defines both compiler and compiler.lexer
2499    # so compiler.parser should not be disabled (since
2500    # compiler is defined)
2501    config8 = {
2502        'version': 1,
2503        'disable_existing_loggers' : False,
2504        'formatters': {
2505            'form1' : {
2506                'format' : '%(levelname)s ++ %(message)s',
2507            },
2508        },
2509        'handlers' : {
2510            'hand1' : {
2511                'class' : 'logging.StreamHandler',
2512                'formatter' : 'form1',
2513                'level' : 'NOTSET',
2514                'stream'  : 'ext://sys.stdout',
2515            },
2516        },
2517        'loggers' : {
2518            'compiler' : {
2519                'level' : 'DEBUG',
2520                'handlers' : ['hand1'],
2521            },
2522            'compiler.lexer' : {
2523            },
2524        },
2525        'root' : {
2526            'level' : 'WARNING',
2527        },
2528    }
2529
2530    # config8a disables existing loggers
2531    config8a = {
2532        'version': 1,
2533        'disable_existing_loggers' : True,
2534        'formatters': {
2535            'form1' : {
2536                'format' : '%(levelname)s ++ %(message)s',
2537            },
2538        },
2539        'handlers' : {
2540            'hand1' : {
2541                'class' : 'logging.StreamHandler',
2542                'formatter' : 'form1',
2543                'level' : 'NOTSET',
2544                'stream'  : 'ext://sys.stdout',
2545            },
2546        },
2547        'loggers' : {
2548            'compiler' : {
2549                'level' : 'DEBUG',
2550                'handlers' : ['hand1'],
2551            },
2552            'compiler.lexer' : {
2553            },
2554        },
2555        'root' : {
2556            'level' : 'WARNING',
2557        },
2558    }
2559
2560    config9 = {
2561        'version': 1,
2562        'formatters': {
2563            'form1' : {
2564                'format' : '%(levelname)s ++ %(message)s',
2565            },
2566        },
2567        'handlers' : {
2568            'hand1' : {
2569                'class' : 'logging.StreamHandler',
2570                'formatter' : 'form1',
2571                'level' : 'WARNING',
2572                'stream'  : 'ext://sys.stdout',
2573            },
2574        },
2575        'loggers' : {
2576            'compiler.parser' : {
2577                'level' : 'WARNING',
2578                'handlers' : ['hand1'],
2579            },
2580        },
2581        'root' : {
2582            'level' : 'NOTSET',
2583        },
2584    }
2585
2586    config9a = {
2587        'version': 1,
2588        'incremental' : True,
2589        'handlers' : {
2590            'hand1' : {
2591                'level' : 'WARNING',
2592            },
2593        },
2594        'loggers' : {
2595            'compiler.parser' : {
2596                'level' : 'INFO',
2597            },
2598        },
2599    }
2600
2601    config9b = {
2602        'version': 1,
2603        'incremental' : True,
2604        'handlers' : {
2605            'hand1' : {
2606                'level' : 'INFO',
2607            },
2608        },
2609        'loggers' : {
2610            'compiler.parser' : {
2611                'level' : 'INFO',
2612            },
2613        },
2614    }
2615
2616    # As config1 but with a filter added
2617    config10 = {
2618        'version': 1,
2619        'formatters': {
2620            'form1' : {
2621                'format' : '%(levelname)s ++ %(message)s',
2622            },
2623        },
2624        'filters' : {
2625            'filt1' : {
2626                'name' : 'compiler.parser',
2627            },
2628        },
2629        'handlers' : {
2630            'hand1' : {
2631                'class' : 'logging.StreamHandler',
2632                'formatter' : 'form1',
2633                'level' : 'NOTSET',
2634                'stream'  : 'ext://sys.stdout',
2635                'filters' : ['filt1'],
2636            },
2637        },
2638        'loggers' : {
2639            'compiler.parser' : {
2640                'level' : 'DEBUG',
2641                'filters' : ['filt1'],
2642            },
2643        },
2644        'root' : {
2645            'level' : 'WARNING',
2646            'handlers' : ['hand1'],
2647        },
2648    }
2649
2650    # As config1 but using cfg:// references
2651    config11 = {
2652        'version': 1,
2653        'true_formatters': {
2654            'form1' : {
2655                'format' : '%(levelname)s ++ %(message)s',
2656            },
2657        },
2658        'handler_configs': {
2659            'hand1' : {
2660                'class' : 'logging.StreamHandler',
2661                'formatter' : 'form1',
2662                'level' : 'NOTSET',
2663                'stream'  : 'ext://sys.stdout',
2664            },
2665        },
2666        'formatters' : 'cfg://true_formatters',
2667        'handlers' : {
2668            'hand1' : 'cfg://handler_configs[hand1]',
2669        },
2670        'loggers' : {
2671            'compiler.parser' : {
2672                'level' : 'DEBUG',
2673                'handlers' : ['hand1'],
2674            },
2675        },
2676        'root' : {
2677            'level' : 'WARNING',
2678        },
2679    }
2680
2681    # As config11 but missing the version key
2682    config12 = {
2683        'true_formatters': {
2684            'form1' : {
2685                'format' : '%(levelname)s ++ %(message)s',
2686            },
2687        },
2688        'handler_configs': {
2689            'hand1' : {
2690                'class' : 'logging.StreamHandler',
2691                'formatter' : 'form1',
2692                'level' : 'NOTSET',
2693                'stream'  : 'ext://sys.stdout',
2694            },
2695        },
2696        'formatters' : 'cfg://true_formatters',
2697        'handlers' : {
2698            'hand1' : 'cfg://handler_configs[hand1]',
2699        },
2700        'loggers' : {
2701            'compiler.parser' : {
2702                'level' : 'DEBUG',
2703                'handlers' : ['hand1'],
2704            },
2705        },
2706        'root' : {
2707            'level' : 'WARNING',
2708        },
2709    }
2710
2711    # As config11 but using an unsupported version
2712    config13 = {
2713        'version': 2,
2714        'true_formatters': {
2715            'form1' : {
2716                'format' : '%(levelname)s ++ %(message)s',
2717            },
2718        },
2719        'handler_configs': {
2720            'hand1' : {
2721                'class' : 'logging.StreamHandler',
2722                'formatter' : 'form1',
2723                'level' : 'NOTSET',
2724                'stream'  : 'ext://sys.stdout',
2725            },
2726        },
2727        'formatters' : 'cfg://true_formatters',
2728        'handlers' : {
2729            'hand1' : 'cfg://handler_configs[hand1]',
2730        },
2731        'loggers' : {
2732            'compiler.parser' : {
2733                'level' : 'DEBUG',
2734                'handlers' : ['hand1'],
2735            },
2736        },
2737        'root' : {
2738            'level' : 'WARNING',
2739        },
2740    }
2741
2742    # As config0, but with properties
2743    config14 = {
2744        'version': 1,
2745        'formatters': {
2746            'form1' : {
2747                'format' : '%(levelname)s ++ %(message)s',
2748            },
2749        },
2750        'handlers' : {
2751            'hand1' : {
2752                'class' : 'logging.StreamHandler',
2753                'formatter' : 'form1',
2754                'level' : 'NOTSET',
2755                'stream'  : 'ext://sys.stdout',
2756                '.': {
2757                    'foo': 'bar',
2758                    'terminator': '!\n',
2759                }
2760            },
2761        },
2762        'root' : {
2763            'level' : 'WARNING',
2764            'handlers' : ['hand1'],
2765        },
2766    }
2767
2768    out_of_order = {
2769        "version": 1,
2770        "formatters": {
2771            "mySimpleFormatter": {
2772                "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s",
2773                "style": "$"
2774            }
2775        },
2776        "handlers": {
2777            "fileGlobal": {
2778                "class": "logging.StreamHandler",
2779                "level": "DEBUG",
2780                "formatter": "mySimpleFormatter"
2781            },
2782            "bufferGlobal": {
2783                "class": "logging.handlers.MemoryHandler",
2784                "capacity": 5,
2785                "formatter": "mySimpleFormatter",
2786                "target": "fileGlobal",
2787                "level": "DEBUG"
2788                }
2789        },
2790        "loggers": {
2791            "mymodule": {
2792                "level": "DEBUG",
2793                "handlers": ["bufferGlobal"],
2794                "propagate": "true"
2795            }
2796        }
2797    }
2798
2799    # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False
2800    custom_formatter_class_validate = {
2801        'version': 1,
2802        'formatters': {
2803            'form1': {
2804                '()': __name__ + '.ExceptionFormatter',
2805                'format': '%(levelname)s:%(name)s:%(message)s',
2806                'validate': False,
2807            },
2808        },
2809        'handlers' : {
2810            'hand1' : {
2811                'class': 'logging.StreamHandler',
2812                'formatter': 'form1',
2813                'level': 'NOTSET',
2814                'stream': 'ext://sys.stdout',
2815            },
2816        },
2817        "loggers": {
2818            "my_test_logger_custom_formatter": {
2819                "level": "DEBUG",
2820                "handlers": ["hand1"],
2821                "propagate": "true"
2822            }
2823        }
2824    }
2825
2826    # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False
2827    custom_formatter_class_validate2 = {
2828        'version': 1,
2829        'formatters': {
2830            'form1': {
2831                'class': __name__ + '.ExceptionFormatter',
2832                'format': '%(levelname)s:%(name)s:%(message)s',
2833                'validate': False,
2834            },
2835        },
2836        'handlers' : {
2837            'hand1' : {
2838                'class': 'logging.StreamHandler',
2839                'formatter': 'form1',
2840                'level': 'NOTSET',
2841                'stream': 'ext://sys.stdout',
2842            },
2843        },
2844        "loggers": {
2845            "my_test_logger_custom_formatter": {
2846                "level": "DEBUG",
2847                "handlers": ["hand1"],
2848                "propagate": "true"
2849            }
2850        }
2851    }
2852
2853    # Configuration with custom class that is not inherited from logging.Formatter
2854    custom_formatter_class_validate3 = {
2855        'version': 1,
2856        'formatters': {
2857            'form1': {
2858                'class': __name__ + '.myCustomFormatter',
2859                'format': '%(levelname)s:%(name)s:%(message)s',
2860                'validate': False,
2861            },
2862        },
2863        'handlers' : {
2864            'hand1' : {
2865                'class': 'logging.StreamHandler',
2866                'formatter': 'form1',
2867                'level': 'NOTSET',
2868                'stream': 'ext://sys.stdout',
2869            },
2870        },
2871        "loggers": {
2872            "my_test_logger_custom_formatter": {
2873                "level": "DEBUG",
2874                "handlers": ["hand1"],
2875                "propagate": "true"
2876            }
2877        }
2878    }
2879
2880    # Configuration with custom function and 'validate' set to False
2881    custom_formatter_with_function = {
2882        'version': 1,
2883        'formatters': {
2884            'form1': {
2885                '()': formatFunc,
2886                'format': '%(levelname)s:%(name)s:%(message)s',
2887                'validate': False,
2888            },
2889        },
2890        'handlers' : {
2891            'hand1' : {
2892                'class': 'logging.StreamHandler',
2893                'formatter': 'form1',
2894                'level': 'NOTSET',
2895                'stream': 'ext://sys.stdout',
2896            },
2897        },
2898        "loggers": {
2899            "my_test_logger_custom_formatter": {
2900                "level": "DEBUG",
2901                "handlers": ["hand1"],
2902                "propagate": "true"
2903            }
2904        }
2905    }
2906
2907    def apply_config(self, conf):
2908        logging.config.dictConfig(conf)
2909
2910    def test_config0_ok(self):
2911        # A simple config which overrides the default settings.
2912        with support.captured_stdout() as output:
2913            self.apply_config(self.config0)
2914            logger = logging.getLogger()
2915            # Won't output anything
2916            logger.info(self.next_message())
2917            # Outputs a message
2918            logger.error(self.next_message())
2919            self.assert_log_lines([
2920                ('ERROR', '2'),
2921            ], stream=output)
2922            # Original logger output is empty.
2923            self.assert_log_lines([])
2924
2925    def test_config1_ok(self, config=config1):
2926        # A config defining a sub-parser as well.
2927        with support.captured_stdout() as output:
2928            self.apply_config(config)
2929            logger = logging.getLogger("compiler.parser")
2930            # Both will output a message
2931            logger.info(self.next_message())
2932            logger.error(self.next_message())
2933            self.assert_log_lines([
2934                ('INFO', '1'),
2935                ('ERROR', '2'),
2936            ], stream=output)
2937            # Original logger output is empty.
2938            self.assert_log_lines([])
2939
2940    def test_config2_failure(self):
2941        # A simple config which overrides the default settings.
2942        self.assertRaises(Exception, self.apply_config, self.config2)
2943
2944    def test_config2a_failure(self):
2945        # A simple config which overrides the default settings.
2946        self.assertRaises(Exception, self.apply_config, self.config2a)
2947
2948    def test_config2b_failure(self):
2949        # A simple config which overrides the default settings.
2950        self.assertRaises(Exception, self.apply_config, self.config2b)
2951
2952    def test_config3_failure(self):
2953        # A simple config which overrides the default settings.
2954        self.assertRaises(Exception, self.apply_config, self.config3)
2955
2956    def test_config4_ok(self):
2957        # A config specifying a custom formatter class.
2958        with support.captured_stdout() as output:
2959            self.apply_config(self.config4)
2960            #logger = logging.getLogger()
2961            try:
2962                raise RuntimeError()
2963            except RuntimeError:
2964                logging.exception("just testing")
2965            sys.stdout.seek(0)
2966            self.assertEqual(output.getvalue(),
2967                "ERROR:root:just testing\nGot a [RuntimeError]\n")
2968            # Original logger output is empty
2969            self.assert_log_lines([])
2970
2971    def test_config4a_ok(self):
2972        # A config specifying a custom formatter class.
2973        with support.captured_stdout() as output:
2974            self.apply_config(self.config4a)
2975            #logger = logging.getLogger()
2976            try:
2977                raise RuntimeError()
2978            except RuntimeError:
2979                logging.exception("just testing")
2980            sys.stdout.seek(0)
2981            self.assertEqual(output.getvalue(),
2982                "ERROR:root:just testing\nGot a [RuntimeError]\n")
2983            # Original logger output is empty
2984            self.assert_log_lines([])
2985
2986    def test_config5_ok(self):
2987        self.test_config1_ok(config=self.config5)
2988
2989    def test_config6_failure(self):
2990        self.assertRaises(Exception, self.apply_config, self.config6)
2991
2992    def test_config7_ok(self):
2993        with support.captured_stdout() as output:
2994            self.apply_config(self.config1)
2995            logger = logging.getLogger("compiler.parser")
2996            # Both will output a message
2997            logger.info(self.next_message())
2998            logger.error(self.next_message())
2999            self.assert_log_lines([
3000                ('INFO', '1'),
3001                ('ERROR', '2'),
3002            ], stream=output)
3003            # Original logger output is empty.
3004            self.assert_log_lines([])
3005        with support.captured_stdout() as output:
3006            self.apply_config(self.config7)
3007            logger = logging.getLogger("compiler.parser")
3008            self.assertTrue(logger.disabled)
3009            logger = logging.getLogger("compiler.lexer")
3010            # Both will output a message
3011            logger.info(self.next_message())
3012            logger.error(self.next_message())
3013            self.assert_log_lines([
3014                ('INFO', '3'),
3015                ('ERROR', '4'),
3016            ], stream=output)
3017            # Original logger output is empty.
3018            self.assert_log_lines([])
3019
3020    # Same as test_config_7_ok but don't disable old loggers.
3021    def test_config_8_ok(self):
3022        with support.captured_stdout() as output:
3023            self.apply_config(self.config1)
3024            logger = logging.getLogger("compiler.parser")
3025            # All will output a message
3026            logger.info(self.next_message())
3027            logger.error(self.next_message())
3028            self.assert_log_lines([
3029                ('INFO', '1'),
3030                ('ERROR', '2'),
3031            ], stream=output)
3032            # Original logger output is empty.
3033            self.assert_log_lines([])
3034        with support.captured_stdout() as output:
3035            self.apply_config(self.config8)
3036            logger = logging.getLogger("compiler.parser")
3037            self.assertFalse(logger.disabled)
3038            # Both will output a message
3039            logger.info(self.next_message())
3040            logger.error(self.next_message())
3041            logger = logging.getLogger("compiler.lexer")
3042            # Both will output a message
3043            logger.info(self.next_message())
3044            logger.error(self.next_message())
3045            self.assert_log_lines([
3046                ('INFO', '3'),
3047                ('ERROR', '4'),
3048                ('INFO', '5'),
3049                ('ERROR', '6'),
3050            ], stream=output)
3051            # Original logger output is empty.
3052            self.assert_log_lines([])
3053
3054    def test_config_8a_ok(self):
3055        with support.captured_stdout() as output:
3056            self.apply_config(self.config1a)
3057            logger = logging.getLogger("compiler.parser")
3058            # See issue #11424. compiler-hyphenated sorts
3059            # between compiler and compiler.xyz and this
3060            # was preventing compiler.xyz from being included
3061            # in the child loggers of compiler because of an
3062            # overzealous loop termination condition.
3063            hyphenated = logging.getLogger('compiler-hyphenated')
3064            # All will output a message
3065            logger.info(self.next_message())
3066            logger.error(self.next_message())
3067            hyphenated.critical(self.next_message())
3068            self.assert_log_lines([
3069                ('INFO', '1'),
3070                ('ERROR', '2'),
3071                ('CRITICAL', '3'),
3072            ], stream=output)
3073            # Original logger output is empty.
3074            self.assert_log_lines([])
3075        with support.captured_stdout() as output:
3076            self.apply_config(self.config8a)
3077            logger = logging.getLogger("compiler.parser")
3078            self.assertFalse(logger.disabled)
3079            # Both will output a message
3080            logger.info(self.next_message())
3081            logger.error(self.next_message())
3082            logger = logging.getLogger("compiler.lexer")
3083            # Both will output a message
3084            logger.info(self.next_message())
3085            logger.error(self.next_message())
3086            # Will not appear
3087            hyphenated.critical(self.next_message())
3088            self.assert_log_lines([
3089                ('INFO', '4'),
3090                ('ERROR', '5'),
3091                ('INFO', '6'),
3092                ('ERROR', '7'),
3093            ], stream=output)
3094            # Original logger output is empty.
3095            self.assert_log_lines([])
3096
3097    def test_config_9_ok(self):
3098        with support.captured_stdout() as output:
3099            self.apply_config(self.config9)
3100            logger = logging.getLogger("compiler.parser")
3101            # Nothing will be output since both handler and logger are set to WARNING
3102            logger.info(self.next_message())
3103            self.assert_log_lines([], stream=output)
3104            self.apply_config(self.config9a)
3105            # Nothing will be output since handler is still set to WARNING
3106            logger.info(self.next_message())
3107            self.assert_log_lines([], stream=output)
3108            self.apply_config(self.config9b)
3109            # Message should now be output
3110            logger.info(self.next_message())
3111            self.assert_log_lines([
3112                ('INFO', '3'),
3113            ], stream=output)
3114
3115    def test_config_10_ok(self):
3116        with support.captured_stdout() as output:
3117            self.apply_config(self.config10)
3118            logger = logging.getLogger("compiler.parser")
3119            logger.warning(self.next_message())
3120            logger = logging.getLogger('compiler')
3121            # Not output, because filtered
3122            logger.warning(self.next_message())
3123            logger = logging.getLogger('compiler.lexer')
3124            # Not output, because filtered
3125            logger.warning(self.next_message())
3126            logger = logging.getLogger("compiler.parser.codegen")
3127            # Output, as not filtered
3128            logger.error(self.next_message())
3129            self.assert_log_lines([
3130                ('WARNING', '1'),
3131                ('ERROR', '4'),
3132            ], stream=output)
3133
3134    def test_config11_ok(self):
3135        self.test_config1_ok(self.config11)
3136
3137    def test_config12_failure(self):
3138        self.assertRaises(Exception, self.apply_config, self.config12)
3139
3140    def test_config13_failure(self):
3141        self.assertRaises(Exception, self.apply_config, self.config13)
3142
3143    def test_config14_ok(self):
3144        with support.captured_stdout() as output:
3145            self.apply_config(self.config14)
3146            h = logging._handlers['hand1']
3147            self.assertEqual(h.foo, 'bar')
3148            self.assertEqual(h.terminator, '!\n')
3149            logging.warning('Exclamation')
3150            self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
3151
3152    def test_config15_ok(self):
3153
3154        def cleanup(h1, fn):
3155            h1.close()
3156            os.remove(fn)
3157
3158        with self.check_no_resource_warning():
3159            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
3160            os.close(fd)
3161
3162            config = {
3163                "version": 1,
3164                "handlers": {
3165                    "file": {
3166                        "class": "logging.FileHandler",
3167                        "filename": fn
3168                    }
3169                },
3170                "root": {
3171                    "handlers": ["file"]
3172                }
3173            }
3174
3175            self.apply_config(config)
3176            self.apply_config(config)
3177
3178        handler = logging.root.handlers[0]
3179        self.addCleanup(cleanup, handler, fn)
3180
3181    def setup_via_listener(self, text, verify=None):
3182        text = text.encode("utf-8")
3183        # Ask for a randomly assigned port (by using port 0)
3184        t = logging.config.listen(0, verify)
3185        t.start()
3186        t.ready.wait()
3187        # Now get the port allocated
3188        port = t.port
3189        t.ready.clear()
3190        try:
3191            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3192            sock.settimeout(2.0)
3193            sock.connect(('localhost', port))
3194
3195            slen = struct.pack('>L', len(text))
3196            s = slen + text
3197            sentsofar = 0
3198            left = len(s)
3199            while left > 0:
3200                sent = sock.send(s[sentsofar:])
3201                sentsofar += sent
3202                left -= sent
3203            sock.close()
3204        finally:
3205            t.ready.wait(2.0)
3206            logging.config.stopListening()
3207            support.join_thread(t, 2.0)
3208
3209    def test_listen_config_10_ok(self):
3210        with support.captured_stdout() as output:
3211            self.setup_via_listener(json.dumps(self.config10))
3212            logger = logging.getLogger("compiler.parser")
3213            logger.warning(self.next_message())
3214            logger = logging.getLogger('compiler')
3215            # Not output, because filtered
3216            logger.warning(self.next_message())
3217            logger = logging.getLogger('compiler.lexer')
3218            # Not output, because filtered
3219            logger.warning(self.next_message())
3220            logger = logging.getLogger("compiler.parser.codegen")
3221            # Output, as not filtered
3222            logger.error(self.next_message())
3223            self.assert_log_lines([
3224                ('WARNING', '1'),
3225                ('ERROR', '4'),
3226            ], stream=output)
3227
3228    def test_listen_config_1_ok(self):
3229        with support.captured_stdout() as output:
3230            self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
3231            logger = logging.getLogger("compiler.parser")
3232            # Both will output a message
3233            logger.info(self.next_message())
3234            logger.error(self.next_message())
3235            self.assert_log_lines([
3236                ('INFO', '1'),
3237                ('ERROR', '2'),
3238            ], stream=output)
3239            # Original logger output is empty.
3240            self.assert_log_lines([])
3241
3242    def test_listen_verify(self):
3243
3244        def verify_fail(stuff):
3245            return None
3246
3247        def verify_reverse(stuff):
3248            return stuff[::-1]
3249
3250        logger = logging.getLogger("compiler.parser")
3251        to_send = textwrap.dedent(ConfigFileTest.config1)
3252        # First, specify a verification function that will fail.
3253        # We expect to see no output, since our configuration
3254        # never took effect.
3255        with support.captured_stdout() as output:
3256            self.setup_via_listener(to_send, verify_fail)
3257            # Both will output a message
3258            logger.info(self.next_message())
3259            logger.error(self.next_message())
3260        self.assert_log_lines([], stream=output)
3261        # Original logger output has the stuff we logged.
3262        self.assert_log_lines([
3263            ('INFO', '1'),
3264            ('ERROR', '2'),
3265        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3266
3267        # Now, perform no verification. Our configuration
3268        # should take effect.
3269
3270        with support.captured_stdout() as output:
3271            self.setup_via_listener(to_send)    # no verify callable specified
3272            logger = logging.getLogger("compiler.parser")
3273            # Both will output a message
3274            logger.info(self.next_message())
3275            logger.error(self.next_message())
3276        self.assert_log_lines([
3277            ('INFO', '3'),
3278            ('ERROR', '4'),
3279        ], stream=output)
3280        # Original logger output still has the stuff we logged before.
3281        self.assert_log_lines([
3282            ('INFO', '1'),
3283            ('ERROR', '2'),
3284        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3285
3286        # Now, perform verification which transforms the bytes.
3287
3288        with support.captured_stdout() as output:
3289            self.setup_via_listener(to_send[::-1], verify_reverse)
3290            logger = logging.getLogger("compiler.parser")
3291            # Both will output a message
3292            logger.info(self.next_message())
3293            logger.error(self.next_message())
3294        self.assert_log_lines([
3295            ('INFO', '5'),
3296            ('ERROR', '6'),
3297        ], stream=output)
3298        # Original logger output still has the stuff we logged before.
3299        self.assert_log_lines([
3300            ('INFO', '1'),
3301            ('ERROR', '2'),
3302        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3303
3304    def test_out_of_order(self):
3305        self.assertRaises(ValueError, self.apply_config, self.out_of_order)
3306
3307    def test_out_of_order_with_dollar_style(self):
3308        config = copy.deepcopy(self.out_of_order)
3309        config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}"
3310
3311        self.apply_config(config)
3312        handler = logging.getLogger('mymodule').handlers[0]
3313        self.assertIsInstance(handler.target, logging.Handler)
3314        self.assertIsInstance(handler.formatter._style,
3315                              logging.StringTemplateStyle)
3316
3317    def test_custom_formatter_class_with_validate(self):
3318        self.apply_config(self.custom_formatter_class_validate)
3319        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3320        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3321
3322    def test_custom_formatter_class_with_validate2(self):
3323        self.apply_config(self.custom_formatter_class_validate2)
3324        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3325        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3326
3327    def test_custom_formatter_class_with_validate2_with_wrong_fmt(self):
3328        config = self.custom_formatter_class_validate.copy()
3329        config['formatters']['form1']['style'] = "$"
3330
3331        # Exception should not be raise as we have configured 'validate' to False
3332        self.apply_config(config)
3333        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3334        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3335
3336    def test_custom_formatter_class_with_validate3(self):
3337        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3)
3338
3339    def test_custom_formatter_function_with_validate(self):
3340        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function)
3341
3342    def test_baseconfig(self):
3343        d = {
3344            'atuple': (1, 2, 3),
3345            'alist': ['a', 'b', 'c'],
3346            'adict': {'d': 'e', 'f': 3 },
3347            'nest1': ('g', ('h', 'i'), 'j'),
3348            'nest2': ['k', ['l', 'm'], 'n'],
3349            'nest3': ['o', 'cfg://alist', 'p'],
3350        }
3351        bc = logging.config.BaseConfigurator(d)
3352        self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
3353        self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
3354        self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
3355        self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
3356        self.assertEqual(bc.convert('cfg://adict.d'), 'e')
3357        self.assertEqual(bc.convert('cfg://adict[f]'), 3)
3358        v = bc.convert('cfg://nest3')
3359        self.assertEqual(v.pop(1), ['a', 'b', 'c'])
3360        self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
3361        self.assertRaises(ValueError, bc.convert, 'cfg://!')
3362        self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
3363
3364class ManagerTest(BaseTest):
3365    def test_manager_loggerclass(self):
3366        logged = []
3367
3368        class MyLogger(logging.Logger):
3369            def _log(self, level, msg, args, exc_info=None, extra=None):
3370                logged.append(msg)
3371
3372        man = logging.Manager(None)
3373        self.assertRaises(TypeError, man.setLoggerClass, int)
3374        man.setLoggerClass(MyLogger)
3375        logger = man.getLogger('test')
3376        logger.warning('should appear in logged')
3377        logging.warning('should not appear in logged')
3378
3379        self.assertEqual(logged, ['should appear in logged'])
3380
3381    def test_set_log_record_factory(self):
3382        man = logging.Manager(None)
3383        expected = object()
3384        man.setLogRecordFactory(expected)
3385        self.assertEqual(man.logRecordFactory, expected)
3386
3387class ChildLoggerTest(BaseTest):
3388    def test_child_loggers(self):
3389        r = logging.getLogger()
3390        l1 = logging.getLogger('abc')
3391        l2 = logging.getLogger('def.ghi')
3392        c1 = r.getChild('xyz')
3393        c2 = r.getChild('uvw.xyz')
3394        self.assertIs(c1, logging.getLogger('xyz'))
3395        self.assertIs(c2, logging.getLogger('uvw.xyz'))
3396        c1 = l1.getChild('def')
3397        c2 = c1.getChild('ghi')
3398        c3 = l1.getChild('def.ghi')
3399        self.assertIs(c1, logging.getLogger('abc.def'))
3400        self.assertIs(c2, logging.getLogger('abc.def.ghi'))
3401        self.assertIs(c2, c3)
3402
3403
3404class DerivedLogRecord(logging.LogRecord):
3405    pass
3406
3407class LogRecordFactoryTest(BaseTest):
3408
3409    def setUp(self):
3410        class CheckingFilter(logging.Filter):
3411            def __init__(self, cls):
3412                self.cls = cls
3413
3414            def filter(self, record):
3415                t = type(record)
3416                if t is not self.cls:
3417                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
3418                            self.cls)
3419                    raise TypeError(msg)
3420                return True
3421
3422        BaseTest.setUp(self)
3423        self.filter = CheckingFilter(DerivedLogRecord)
3424        self.root_logger.addFilter(self.filter)
3425        self.orig_factory = logging.getLogRecordFactory()
3426
3427    def tearDown(self):
3428        self.root_logger.removeFilter(self.filter)
3429        BaseTest.tearDown(self)
3430        logging.setLogRecordFactory(self.orig_factory)
3431
3432    def test_logrecord_class(self):
3433        self.assertRaises(TypeError, self.root_logger.warning,
3434                          self.next_message())
3435        logging.setLogRecordFactory(DerivedLogRecord)
3436        self.root_logger.error(self.next_message())
3437        self.assert_log_lines([
3438           ('root', 'ERROR', '2'),
3439        ])
3440
3441
3442class QueueHandlerTest(BaseTest):
3443    # Do not bother with a logger name group.
3444    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
3445
3446    def setUp(self):
3447        BaseTest.setUp(self)
3448        self.queue = queue.Queue(-1)
3449        self.que_hdlr = logging.handlers.QueueHandler(self.queue)
3450        self.name = 'que'
3451        self.que_logger = logging.getLogger('que')
3452        self.que_logger.propagate = False
3453        self.que_logger.setLevel(logging.WARNING)
3454        self.que_logger.addHandler(self.que_hdlr)
3455
3456    def tearDown(self):
3457        self.que_hdlr.close()
3458        BaseTest.tearDown(self)
3459
3460    def test_queue_handler(self):
3461        self.que_logger.debug(self.next_message())
3462        self.assertRaises(queue.Empty, self.queue.get_nowait)
3463        self.que_logger.info(self.next_message())
3464        self.assertRaises(queue.Empty, self.queue.get_nowait)
3465        msg = self.next_message()
3466        self.que_logger.warning(msg)
3467        data = self.queue.get_nowait()
3468        self.assertTrue(isinstance(data, logging.LogRecord))
3469        self.assertEqual(data.name, self.que_logger.name)
3470        self.assertEqual((data.msg, data.args), (msg, None))
3471
3472    def test_formatting(self):
3473        msg = self.next_message()
3474        levelname = logging.getLevelName(logging.WARNING)
3475        log_format_str = '{name} -> {levelname}: {message}'
3476        formatted_msg = log_format_str.format(name=self.name,
3477                                              levelname=levelname, message=msg)
3478        formatter = logging.Formatter(self.log_format)
3479        self.que_hdlr.setFormatter(formatter)
3480        self.que_logger.warning(msg)
3481        log_record = self.queue.get_nowait()
3482        self.assertEqual(formatted_msg, log_record.msg)
3483        self.assertEqual(formatted_msg, log_record.message)
3484
3485    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3486                         'logging.handlers.QueueListener required for this test')
3487    def test_queue_listener(self):
3488        handler = support.TestHandler(support.Matcher())
3489        listener = logging.handlers.QueueListener(self.queue, handler)
3490        listener.start()
3491        try:
3492            self.que_logger.warning(self.next_message())
3493            self.que_logger.error(self.next_message())
3494            self.que_logger.critical(self.next_message())
3495        finally:
3496            listener.stop()
3497        self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
3498        self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
3499        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
3500        handler.close()
3501
3502        # Now test with respect_handler_level set
3503
3504        handler = support.TestHandler(support.Matcher())
3505        handler.setLevel(logging.CRITICAL)
3506        listener = logging.handlers.QueueListener(self.queue, handler,
3507                                                  respect_handler_level=True)
3508        listener.start()
3509        try:
3510            self.que_logger.warning(self.next_message())
3511            self.que_logger.error(self.next_message())
3512            self.que_logger.critical(self.next_message())
3513        finally:
3514            listener.stop()
3515        self.assertFalse(handler.matches(levelno=logging.WARNING, message='4'))
3516        self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
3517        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
3518        handler.close()
3519
3520    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3521                         'logging.handlers.QueueListener required for this test')
3522    def test_queue_listener_with_StreamHandler(self):
3523        # Test that traceback only appends once (bpo-34334).
3524        listener = logging.handlers.QueueListener(self.queue, self.root_hdlr)
3525        listener.start()
3526        try:
3527            1 / 0
3528        except ZeroDivisionError as e:
3529            exc = e
3530            self.que_logger.exception(self.next_message(), exc_info=exc)
3531        listener.stop()
3532        self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1)
3533
3534    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3535                         'logging.handlers.QueueListener required for this test')
3536    def test_queue_listener_with_multiple_handlers(self):
3537        # Test that queue handler format doesn't affect other handler formats (bpo-35726).
3538        self.que_hdlr.setFormatter(self.root_formatter)
3539        self.que_logger.addHandler(self.root_hdlr)
3540
3541        listener = logging.handlers.QueueListener(self.queue, self.que_hdlr)
3542        listener.start()
3543        self.que_logger.error("error")
3544        listener.stop()
3545        self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error")
3546
3547if hasattr(logging.handlers, 'QueueListener'):
3548    import multiprocessing
3549    from unittest.mock import patch
3550
3551    class QueueListenerTest(BaseTest):
3552        """
3553        Tests based on patch submitted for issue #27930. Ensure that
3554        QueueListener handles all log messages.
3555        """
3556
3557        repeat = 20
3558
3559        @staticmethod
3560        def setup_and_log(log_queue, ident):
3561            """
3562            Creates a logger with a QueueHandler that logs to a queue read by a
3563            QueueListener. Starts the listener, logs five messages, and stops
3564            the listener.
3565            """
3566            logger = logging.getLogger('test_logger_with_id_%s' % ident)
3567            logger.setLevel(logging.DEBUG)
3568            handler = logging.handlers.QueueHandler(log_queue)
3569            logger.addHandler(handler)
3570            listener = logging.handlers.QueueListener(log_queue)
3571            listener.start()
3572
3573            logger.info('one')
3574            logger.info('two')
3575            logger.info('three')
3576            logger.info('four')
3577            logger.info('five')
3578
3579            listener.stop()
3580            logger.removeHandler(handler)
3581            handler.close()
3582
3583        @patch.object(logging.handlers.QueueListener, 'handle')
3584        def test_handle_called_with_queue_queue(self, mock_handle):
3585            for i in range(self.repeat):
3586                log_queue = queue.Queue()
3587                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3588            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3589                             'correct number of handled log messages')
3590
3591        @patch.object(logging.handlers.QueueListener, 'handle')
3592        def test_handle_called_with_mp_queue(self, mock_handle):
3593            # Issue 28668: The multiprocessing (mp) module is not functional
3594            # when the mp.synchronize module cannot be imported.
3595            support.import_module('multiprocessing.synchronize')
3596            for i in range(self.repeat):
3597                log_queue = multiprocessing.Queue()
3598                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3599                log_queue.close()
3600                log_queue.join_thread()
3601            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3602                             'correct number of handled log messages')
3603
3604        @staticmethod
3605        def get_all_from_queue(log_queue):
3606            try:
3607                while True:
3608                    yield log_queue.get_nowait()
3609            except queue.Empty:
3610                return []
3611
3612        def test_no_messages_in_queue_after_stop(self):
3613            """
3614            Five messages are logged then the QueueListener is stopped. This
3615            test then gets everything off the queue. Failure of this test
3616            indicates that messages were not registered on the queue until
3617            _after_ the QueueListener stopped.
3618            """
3619            # Issue 28668: The multiprocessing (mp) module is not functional
3620            # when the mp.synchronize module cannot be imported.
3621            support.import_module('multiprocessing.synchronize')
3622            for i in range(self.repeat):
3623                queue = multiprocessing.Queue()
3624                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
3625                # time.sleep(1)
3626                items = list(self.get_all_from_queue(queue))
3627                queue.close()
3628                queue.join_thread()
3629
3630                expected = [[], [logging.handlers.QueueListener._sentinel]]
3631                self.assertIn(items, expected,
3632                              'Found unexpected messages in queue: %s' % (
3633                                    [m.msg if isinstance(m, logging.LogRecord)
3634                                     else m for m in items]))
3635
3636        def test_calls_task_done_after_stop(self):
3637            # Issue 36813: Make sure queue.join does not deadlock.
3638            log_queue = queue.Queue()
3639            listener = logging.handlers.QueueListener(log_queue)
3640            listener.start()
3641            listener.stop()
3642            with self.assertRaises(ValueError):
3643                # Make sure all tasks are done and .join won't block.
3644                log_queue.task_done()
3645
3646
3647ZERO = datetime.timedelta(0)
3648
3649class UTC(datetime.tzinfo):
3650    def utcoffset(self, dt):
3651        return ZERO
3652
3653    dst = utcoffset
3654
3655    def tzname(self, dt):
3656        return 'UTC'
3657
3658utc = UTC()
3659
3660class FormatterTest(unittest.TestCase):
3661    def setUp(self):
3662        self.common = {
3663            'name': 'formatter.test',
3664            'level': logging.DEBUG,
3665            'pathname': os.path.join('path', 'to', 'dummy.ext'),
3666            'lineno': 42,
3667            'exc_info': None,
3668            'func': None,
3669            'msg': 'Message with %d %s',
3670            'args': (2, 'placeholders'),
3671        }
3672        self.variants = {
3673        }
3674
3675    def get_record(self, name=None):
3676        result = dict(self.common)
3677        if name is not None:
3678            result.update(self.variants[name])
3679        return logging.makeLogRecord(result)
3680
3681    def assert_error_message(self, exception, message, *args, **kwargs):
3682        try:
3683            self.assertRaises(exception, *args, **kwargs)
3684        except exception as e:
3685            self.assertEqual(message, e.message)
3686
3687    def test_percent(self):
3688        # Test %-formatting
3689        r = self.get_record()
3690        f = logging.Formatter('${%(message)s}')
3691        self.assertEqual(f.format(r), '${Message with 2 placeholders}')
3692        f = logging.Formatter('%(random)s')
3693        self.assertRaises(ValueError, f.format, r)
3694        self.assertFalse(f.usesTime())
3695        f = logging.Formatter('%(asctime)s')
3696        self.assertTrue(f.usesTime())
3697        f = logging.Formatter('%(asctime)-15s')
3698        self.assertTrue(f.usesTime())
3699        f = logging.Formatter('%(asctime)#15s')
3700        self.assertTrue(f.usesTime())
3701
3702    def test_braces(self):
3703        # Test {}-formatting
3704        r = self.get_record()
3705        f = logging.Formatter('$%{message}%$', style='{')
3706        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3707        f = logging.Formatter('{random}', style='{')
3708        self.assertRaises(ValueError, f.format, r)
3709        f = logging.Formatter("{message}", style='{')
3710        self.assertFalse(f.usesTime())
3711        f = logging.Formatter('{asctime}', style='{')
3712        self.assertTrue(f.usesTime())
3713        f = logging.Formatter('{asctime!s:15}', style='{')
3714        self.assertTrue(f.usesTime())
3715        f = logging.Formatter('{asctime:15}', style='{')
3716        self.assertTrue(f.usesTime())
3717
3718    def test_dollars(self):
3719        # Test $-formatting
3720        r = self.get_record()
3721        f = logging.Formatter('${message}', style='$')
3722        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3723        f = logging.Formatter('$message', style='$')
3724        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3725        f = logging.Formatter('$$%${message}%$$', style='$')
3726        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3727        f = logging.Formatter('${random}', style='$')
3728        self.assertRaises(ValueError, f.format, r)
3729        self.assertFalse(f.usesTime())
3730        f = logging.Formatter('${asctime}', style='$')
3731        self.assertTrue(f.usesTime())
3732        f = logging.Formatter('$asctime', style='$')
3733        self.assertTrue(f.usesTime())
3734        f = logging.Formatter('${message}', style='$')
3735        self.assertFalse(f.usesTime())
3736        f = logging.Formatter('${asctime}--', style='$')
3737        self.assertTrue(f.usesTime())
3738
3739    def test_format_validate(self):
3740        # Check correct formatting
3741        # Percentage style
3742        f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3743        self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3744        f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3745        self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3746        f = logging.Formatter("%(process)#+027.23X")
3747        self.assertEqual(f._fmt, "%(process)#+027.23X")
3748        f = logging.Formatter("%(foo)#.*g")
3749        self.assertEqual(f._fmt, "%(foo)#.*g")
3750
3751        # StrFormat Style
3752        f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{")
3753        self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}")
3754        f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{")
3755        self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}")
3756        f = logging.Formatter("{customfield!s:#<30}", style="{")
3757        self.assertEqual(f._fmt, "{customfield!s:#<30}")
3758        f = logging.Formatter("{message!r}", style="{")
3759        self.assertEqual(f._fmt, "{message!r}")
3760        f = logging.Formatter("{message!s}", style="{")
3761        self.assertEqual(f._fmt, "{message!s}")
3762        f = logging.Formatter("{message!a}", style="{")
3763        self.assertEqual(f._fmt, "{message!a}")
3764        f = logging.Formatter("{process!r:4.2}", style="{")
3765        self.assertEqual(f._fmt, "{process!r:4.2}")
3766        f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{")
3767        self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}")
3768        f = logging.Formatter("{process!s:{w},.{p}}", style="{")
3769        self.assertEqual(f._fmt, "{process!s:{w},.{p}}")
3770        f = logging.Formatter("{foo:12.{p}}", style="{")
3771        self.assertEqual(f._fmt, "{foo:12.{p}}")
3772        f = logging.Formatter("{foo:{w}.6}", style="{")
3773        self.assertEqual(f._fmt, "{foo:{w}.6}")
3774        f = logging.Formatter("{foo[0].bar[1].baz}", style="{")
3775        self.assertEqual(f._fmt, "{foo[0].bar[1].baz}")
3776        f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{")
3777        self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}")
3778        f = logging.Formatter("{12[k1].bar[k2].baz}", style="{")
3779        self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}")
3780
3781        # Dollar style
3782        f = logging.Formatter("${asctime} - $message", style="$")
3783        self.assertEqual(f._fmt, "${asctime} - $message")
3784        f = logging.Formatter("$bar $$", style="$")
3785        self.assertEqual(f._fmt, "$bar $$")
3786        f = logging.Formatter("$bar $$$$", style="$")
3787        self.assertEqual(f._fmt, "$bar $$$$")  # this would print two $($$)
3788
3789        # Testing when ValueError being raised from incorrect format
3790        # Percentage Style
3791        self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z")
3792        self.assertRaises(ValueError, logging.Formatter, "%(asctime)b")
3793        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*")
3794        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s")
3795        self.assertRaises(ValueError, logging.Formatter, "%(asctime)_")
3796        self.assertRaises(ValueError, logging.Formatter, '{asctime}')
3797        self.assertRaises(ValueError, logging.Formatter, '${message}')
3798        self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f')  # with both * and decimal number as precision
3799        self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f')
3800
3801        # StrFormat Style
3802        # Testing failure for '-' in field name
3803        self.assert_error_message(
3804            ValueError,
3805            "invalid field name/expression: 'name-thing'",
3806            logging.Formatter, "{name-thing}", style="{"
3807        )
3808        # Testing failure for style mismatch
3809        self.assert_error_message(
3810            ValueError,
3811            "invalid format: no fields",
3812            logging.Formatter, '%(asctime)s', style='{'
3813        )
3814        # Testing failure for invalid conversion
3815        self.assert_error_message(
3816            ValueError,
3817            "invalid conversion: 'Z'"
3818        )
3819        self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{')
3820        self.assert_error_message(
3821            ValueError,
3822            "invalid format: expected ':' after conversion specifier",
3823            logging.Formatter, '{asctime!aa:15}', style='{'
3824        )
3825        # Testing failure for invalid spec
3826        self.assert_error_message(
3827            ValueError,
3828            "bad specifier: '.2ff'",
3829            logging.Formatter, '{process:.2ff}', style='{'
3830        )
3831        self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{')
3832        self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{')
3833        self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{')
3834        self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{')
3835        # Testing failure for mismatch braces
3836        self.assert_error_message(
3837            ValueError,
3838            "invalid format: unmatched '{' in format spec",
3839            logging.Formatter, '{process', style='{'
3840        )
3841        self.assert_error_message(
3842            ValueError,
3843            "invalid format: unmatched '{' in format spec",
3844            logging.Formatter, 'process}', style='{'
3845        )
3846        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{')
3847        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{')
3848        self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{')
3849        self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{')
3850        self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{')
3851        self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{')
3852        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{')
3853        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{')
3854        self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{')
3855
3856        # Dollar style
3857        # Testing failure for mismatch bare $
3858        self.assert_error_message(
3859            ValueError,
3860            "invalid format: bare \'$\' not allowed",
3861            logging.Formatter, '$bar $$$', style='$'
3862        )
3863        self.assert_error_message(
3864            ValueError,
3865            "invalid format: bare \'$\' not allowed",
3866            logging.Formatter, 'bar $', style='$'
3867        )
3868        self.assert_error_message(
3869            ValueError,
3870            "invalid format: bare \'$\' not allowed",
3871            logging.Formatter, 'foo $.', style='$'
3872        )
3873        # Testing failure for mismatch style
3874        self.assert_error_message(
3875            ValueError,
3876            "invalid format: no fields",
3877            logging.Formatter, '{asctime}', style='$'
3878        )
3879        self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$')
3880
3881        # Testing failure for incorrect fields
3882        self.assert_error_message(
3883            ValueError,
3884            "invalid format: no fields",
3885            logging.Formatter, 'foo', style='$'
3886        )
3887        self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$')
3888
3889    def test_invalid_style(self):
3890        self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
3891
3892    def test_time(self):
3893        r = self.get_record()
3894        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
3895        # We use None to indicate we want the local timezone
3896        # We're essentially converting a UTC time to local time
3897        r.created = time.mktime(dt.astimezone(None).timetuple())
3898        r.msecs = 123
3899        f = logging.Formatter('%(asctime)s %(message)s')
3900        f.converter = time.gmtime
3901        self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
3902        self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
3903        f.format(r)
3904        self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
3905
3906class TestBufferingFormatter(logging.BufferingFormatter):
3907    def formatHeader(self, records):
3908        return '[(%d)' % len(records)
3909
3910    def formatFooter(self, records):
3911        return '(%d)]' % len(records)
3912
3913class BufferingFormatterTest(unittest.TestCase):
3914    def setUp(self):
3915        self.records = [
3916            logging.makeLogRecord({'msg': 'one'}),
3917            logging.makeLogRecord({'msg': 'two'}),
3918        ]
3919
3920    def test_default(self):
3921        f = logging.BufferingFormatter()
3922        self.assertEqual('', f.format([]))
3923        self.assertEqual('onetwo', f.format(self.records))
3924
3925    def test_custom(self):
3926        f = TestBufferingFormatter()
3927        self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
3928        lf = logging.Formatter('<%(message)s>')
3929        f = TestBufferingFormatter(lf)
3930        self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
3931
3932class ExceptionTest(BaseTest):
3933    def test_formatting(self):
3934        r = self.root_logger
3935        h = RecordingHandler()
3936        r.addHandler(h)
3937        try:
3938            raise RuntimeError('deliberate mistake')
3939        except:
3940            logging.exception('failed', stack_info=True)
3941        r.removeHandler(h)
3942        h.close()
3943        r = h.records[0]
3944        self.assertTrue(r.exc_text.startswith('Traceback (most recent '
3945                                              'call last):\n'))
3946        self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
3947                                            'deliberate mistake'))
3948        self.assertTrue(r.stack_info.startswith('Stack (most recent '
3949                                              'call last):\n'))
3950        self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
3951                                            'stack_info=True)'))
3952
3953
3954class LastResortTest(BaseTest):
3955    def test_last_resort(self):
3956        # Test the last resort handler
3957        root = self.root_logger
3958        root.removeHandler(self.root_hdlr)
3959        old_lastresort = logging.lastResort
3960        old_raise_exceptions = logging.raiseExceptions
3961
3962        try:
3963            with support.captured_stderr() as stderr:
3964                root.debug('This should not appear')
3965                self.assertEqual(stderr.getvalue(), '')
3966                root.warning('Final chance!')
3967                self.assertEqual(stderr.getvalue(), 'Final chance!\n')
3968
3969            # No handlers and no last resort, so 'No handlers' message
3970            logging.lastResort = None
3971            with support.captured_stderr() as stderr:
3972                root.warning('Final chance!')
3973                msg = 'No handlers could be found for logger "root"\n'
3974                self.assertEqual(stderr.getvalue(), msg)
3975
3976            # 'No handlers' message only printed once
3977            with support.captured_stderr() as stderr:
3978                root.warning('Final chance!')
3979                self.assertEqual(stderr.getvalue(), '')
3980
3981            # If raiseExceptions is False, no message is printed
3982            root.manager.emittedNoHandlerWarning = False
3983            logging.raiseExceptions = False
3984            with support.captured_stderr() as stderr:
3985                root.warning('Final chance!')
3986                self.assertEqual(stderr.getvalue(), '')
3987        finally:
3988            root.addHandler(self.root_hdlr)
3989            logging.lastResort = old_lastresort
3990            logging.raiseExceptions = old_raise_exceptions
3991
3992
3993class FakeHandler:
3994
3995    def __init__(self, identifier, called):
3996        for method in ('acquire', 'flush', 'close', 'release'):
3997            setattr(self, method, self.record_call(identifier, method, called))
3998
3999    def record_call(self, identifier, method_name, called):
4000        def inner():
4001            called.append('{} - {}'.format(identifier, method_name))
4002        return inner
4003
4004
4005class RecordingHandler(logging.NullHandler):
4006
4007    def __init__(self, *args, **kwargs):
4008        super(RecordingHandler, self).__init__(*args, **kwargs)
4009        self.records = []
4010
4011    def handle(self, record):
4012        """Keep track of all the emitted records."""
4013        self.records.append(record)
4014
4015
4016class ShutdownTest(BaseTest):
4017
4018    """Test suite for the shutdown method."""
4019
4020    def setUp(self):
4021        super(ShutdownTest, self).setUp()
4022        self.called = []
4023
4024        raise_exceptions = logging.raiseExceptions
4025        self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
4026
4027    def raise_error(self, error):
4028        def inner():
4029            raise error()
4030        return inner
4031
4032    def test_no_failure(self):
4033        # create some fake handlers
4034        handler0 = FakeHandler(0, self.called)
4035        handler1 = FakeHandler(1, self.called)
4036        handler2 = FakeHandler(2, self.called)
4037
4038        # create live weakref to those handlers
4039        handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
4040
4041        logging.shutdown(handlerList=list(handlers))
4042
4043        expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
4044                    '1 - acquire', '1 - flush', '1 - close', '1 - release',
4045                    '0 - acquire', '0 - flush', '0 - close', '0 - release']
4046        self.assertEqual(expected, self.called)
4047
4048    def _test_with_failure_in_method(self, method, error):
4049        handler = FakeHandler(0, self.called)
4050        setattr(handler, method, self.raise_error(error))
4051        handlers = [logging.weakref.ref(handler)]
4052
4053        logging.shutdown(handlerList=list(handlers))
4054
4055        self.assertEqual('0 - release', self.called[-1])
4056
4057    def test_with_ioerror_in_acquire(self):
4058        self._test_with_failure_in_method('acquire', OSError)
4059
4060    def test_with_ioerror_in_flush(self):
4061        self._test_with_failure_in_method('flush', OSError)
4062
4063    def test_with_ioerror_in_close(self):
4064        self._test_with_failure_in_method('close', OSError)
4065
4066    def test_with_valueerror_in_acquire(self):
4067        self._test_with_failure_in_method('acquire', ValueError)
4068
4069    def test_with_valueerror_in_flush(self):
4070        self._test_with_failure_in_method('flush', ValueError)
4071
4072    def test_with_valueerror_in_close(self):
4073        self._test_with_failure_in_method('close', ValueError)
4074
4075    def test_with_other_error_in_acquire_without_raise(self):
4076        logging.raiseExceptions = False
4077        self._test_with_failure_in_method('acquire', IndexError)
4078
4079    def test_with_other_error_in_flush_without_raise(self):
4080        logging.raiseExceptions = False
4081        self._test_with_failure_in_method('flush', IndexError)
4082
4083    def test_with_other_error_in_close_without_raise(self):
4084        logging.raiseExceptions = False
4085        self._test_with_failure_in_method('close', IndexError)
4086
4087    def test_with_other_error_in_acquire_with_raise(self):
4088        logging.raiseExceptions = True
4089        self.assertRaises(IndexError, self._test_with_failure_in_method,
4090                          'acquire', IndexError)
4091
4092    def test_with_other_error_in_flush_with_raise(self):
4093        logging.raiseExceptions = True
4094        self.assertRaises(IndexError, self._test_with_failure_in_method,
4095                          'flush', IndexError)
4096
4097    def test_with_other_error_in_close_with_raise(self):
4098        logging.raiseExceptions = True
4099        self.assertRaises(IndexError, self._test_with_failure_in_method,
4100                          'close', IndexError)
4101
4102
4103class ModuleLevelMiscTest(BaseTest):
4104
4105    """Test suite for some module level methods."""
4106
4107    def test_disable(self):
4108        old_disable = logging.root.manager.disable
4109        # confirm our assumptions are correct
4110        self.assertEqual(old_disable, 0)
4111        self.addCleanup(logging.disable, old_disable)
4112
4113        logging.disable(83)
4114        self.assertEqual(logging.root.manager.disable, 83)
4115
4116        # test the default value introduced in 3.7
4117        # (Issue #28524)
4118        logging.disable()
4119        self.assertEqual(logging.root.manager.disable, logging.CRITICAL)
4120
4121    def _test_log(self, method, level=None):
4122        called = []
4123        support.patch(self, logging, 'basicConfig',
4124                      lambda *a, **kw: called.append((a, kw)))
4125
4126        recording = RecordingHandler()
4127        logging.root.addHandler(recording)
4128
4129        log_method = getattr(logging, method)
4130        if level is not None:
4131            log_method(level, "test me: %r", recording)
4132        else:
4133            log_method("test me: %r", recording)
4134
4135        self.assertEqual(len(recording.records), 1)
4136        record = recording.records[0]
4137        self.assertEqual(record.getMessage(), "test me: %r" % recording)
4138
4139        expected_level = level if level is not None else getattr(logging, method.upper())
4140        self.assertEqual(record.levelno, expected_level)
4141
4142        # basicConfig was not called!
4143        self.assertEqual(called, [])
4144
4145    def test_log(self):
4146        self._test_log('log', logging.ERROR)
4147
4148    def test_debug(self):
4149        self._test_log('debug')
4150
4151    def test_info(self):
4152        self._test_log('info')
4153
4154    def test_warning(self):
4155        self._test_log('warning')
4156
4157    def test_error(self):
4158        self._test_log('error')
4159
4160    def test_critical(self):
4161        self._test_log('critical')
4162
4163    def test_set_logger_class(self):
4164        self.assertRaises(TypeError, logging.setLoggerClass, object)
4165
4166        class MyLogger(logging.Logger):
4167            pass
4168
4169        logging.setLoggerClass(MyLogger)
4170        self.assertEqual(logging.getLoggerClass(), MyLogger)
4171
4172        logging.setLoggerClass(logging.Logger)
4173        self.assertEqual(logging.getLoggerClass(), logging.Logger)
4174
4175    def test_subclass_logger_cache(self):
4176        # bpo-37258
4177        message = []
4178
4179        class MyLogger(logging.getLoggerClass()):
4180            def __init__(self, name='MyLogger', level=logging.NOTSET):
4181                super().__init__(name, level)
4182                message.append('initialized')
4183
4184        logging.setLoggerClass(MyLogger)
4185        logger = logging.getLogger('just_some_logger')
4186        self.assertEqual(message, ['initialized'])
4187        stream = io.StringIO()
4188        h = logging.StreamHandler(stream)
4189        logger.addHandler(h)
4190        try:
4191            logger.setLevel(logging.DEBUG)
4192            logger.debug("hello")
4193            self.assertEqual(stream.getvalue().strip(), "hello")
4194
4195            stream.truncate(0)
4196            stream.seek(0)
4197
4198            logger.setLevel(logging.INFO)
4199            logger.debug("hello")
4200            self.assertEqual(stream.getvalue(), "")
4201        finally:
4202            logger.removeHandler(h)
4203            h.close()
4204            logging.setLoggerClass(logging.Logger)
4205
4206    @support.requires_type_collecting
4207    def test_logging_at_shutdown(self):
4208        # Issue #20037
4209        code = """if 1:
4210            import logging
4211
4212            class A:
4213                def __del__(self):
4214                    try:
4215                        raise ValueError("some error")
4216                    except Exception:
4217                        logging.exception("exception in __del__")
4218
4219            a = A()"""
4220        rc, out, err = assert_python_ok("-c", code)
4221        err = err.decode()
4222        self.assertIn("exception in __del__", err)
4223        self.assertIn("ValueError: some error", err)
4224
4225    def test_recursion_error(self):
4226        # Issue 36272
4227        code = """if 1:
4228            import logging
4229
4230            def rec():
4231                logging.error("foo")
4232                rec()
4233
4234            rec()"""
4235        rc, out, err = assert_python_failure("-c", code)
4236        err = err.decode()
4237        self.assertNotIn("Cannot recover from stack overflow.", err)
4238        self.assertEqual(rc, 1)
4239
4240
4241class LogRecordTest(BaseTest):
4242    def test_str_rep(self):
4243        r = logging.makeLogRecord({})
4244        s = str(r)
4245        self.assertTrue(s.startswith('<LogRecord: '))
4246        self.assertTrue(s.endswith('>'))
4247
4248    def test_dict_arg(self):
4249        h = RecordingHandler()
4250        r = logging.getLogger()
4251        r.addHandler(h)
4252        d = {'less' : 'more' }
4253        logging.warning('less is %(less)s', d)
4254        self.assertIs(h.records[0].args, d)
4255        self.assertEqual(h.records[0].message, 'less is more')
4256        r.removeHandler(h)
4257        h.close()
4258
4259    def test_multiprocessing(self):
4260        r = logging.makeLogRecord({})
4261        self.assertEqual(r.processName, 'MainProcess')
4262        try:
4263            import multiprocessing as mp
4264            r = logging.makeLogRecord({})
4265            self.assertEqual(r.processName, mp.current_process().name)
4266        except ImportError:
4267            pass
4268
4269    def test_optional(self):
4270        r = logging.makeLogRecord({})
4271        NOT_NONE = self.assertIsNotNone
4272        NOT_NONE(r.thread)
4273        NOT_NONE(r.threadName)
4274        NOT_NONE(r.process)
4275        NOT_NONE(r.processName)
4276        log_threads = logging.logThreads
4277        log_processes = logging.logProcesses
4278        log_multiprocessing = logging.logMultiprocessing
4279        try:
4280            logging.logThreads = False
4281            logging.logProcesses = False
4282            logging.logMultiprocessing = False
4283            r = logging.makeLogRecord({})
4284            NONE = self.assertIsNone
4285            NONE(r.thread)
4286            NONE(r.threadName)
4287            NONE(r.process)
4288            NONE(r.processName)
4289        finally:
4290            logging.logThreads = log_threads
4291            logging.logProcesses = log_processes
4292            logging.logMultiprocessing = log_multiprocessing
4293
4294class BasicConfigTest(unittest.TestCase):
4295
4296    """Test suite for logging.basicConfig."""
4297
4298    def setUp(self):
4299        super(BasicConfigTest, self).setUp()
4300        self.handlers = logging.root.handlers
4301        self.saved_handlers = logging._handlers.copy()
4302        self.saved_handler_list = logging._handlerList[:]
4303        self.original_logging_level = logging.root.level
4304        self.addCleanup(self.cleanup)
4305        logging.root.handlers = []
4306
4307    def tearDown(self):
4308        for h in logging.root.handlers[:]:
4309            logging.root.removeHandler(h)
4310            h.close()
4311        super(BasicConfigTest, self).tearDown()
4312
4313    def cleanup(self):
4314        setattr(logging.root, 'handlers', self.handlers)
4315        logging._handlers.clear()
4316        logging._handlers.update(self.saved_handlers)
4317        logging._handlerList[:] = self.saved_handler_list
4318        logging.root.setLevel(self.original_logging_level)
4319
4320    def test_no_kwargs(self):
4321        logging.basicConfig()
4322
4323        # handler defaults to a StreamHandler to sys.stderr
4324        self.assertEqual(len(logging.root.handlers), 1)
4325        handler = logging.root.handlers[0]
4326        self.assertIsInstance(handler, logging.StreamHandler)
4327        self.assertEqual(handler.stream, sys.stderr)
4328
4329        formatter = handler.formatter
4330        # format defaults to logging.BASIC_FORMAT
4331        self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
4332        # datefmt defaults to None
4333        self.assertIsNone(formatter.datefmt)
4334        # style defaults to %
4335        self.assertIsInstance(formatter._style, logging.PercentStyle)
4336
4337        # level is not explicitly set
4338        self.assertEqual(logging.root.level, self.original_logging_level)
4339
4340    def test_strformatstyle(self):
4341        with support.captured_stdout() as output:
4342            logging.basicConfig(stream=sys.stdout, style="{")
4343            logging.error("Log an error")
4344            sys.stdout.seek(0)
4345            self.assertEqual(output.getvalue().strip(),
4346                "ERROR:root:Log an error")
4347
4348    def test_stringtemplatestyle(self):
4349        with support.captured_stdout() as output:
4350            logging.basicConfig(stream=sys.stdout, style="$")
4351            logging.error("Log an error")
4352            sys.stdout.seek(0)
4353            self.assertEqual(output.getvalue().strip(),
4354                "ERROR:root:Log an error")
4355
4356    def test_filename(self):
4357
4358        def cleanup(h1, h2, fn):
4359            h1.close()
4360            h2.close()
4361            os.remove(fn)
4362
4363        logging.basicConfig(filename='test.log')
4364
4365        self.assertEqual(len(logging.root.handlers), 1)
4366        handler = logging.root.handlers[0]
4367        self.assertIsInstance(handler, logging.FileHandler)
4368
4369        expected = logging.FileHandler('test.log', 'a')
4370        self.assertEqual(handler.stream.mode, expected.stream.mode)
4371        self.assertEqual(handler.stream.name, expected.stream.name)
4372        self.addCleanup(cleanup, handler, expected, 'test.log')
4373
4374    def test_filemode(self):
4375
4376        def cleanup(h1, h2, fn):
4377            h1.close()
4378            h2.close()
4379            os.remove(fn)
4380
4381        logging.basicConfig(filename='test.log', filemode='wb')
4382
4383        handler = logging.root.handlers[0]
4384        expected = logging.FileHandler('test.log', 'wb')
4385        self.assertEqual(handler.stream.mode, expected.stream.mode)
4386        self.addCleanup(cleanup, handler, expected, 'test.log')
4387
4388    def test_stream(self):
4389        stream = io.StringIO()
4390        self.addCleanup(stream.close)
4391        logging.basicConfig(stream=stream)
4392
4393        self.assertEqual(len(logging.root.handlers), 1)
4394        handler = logging.root.handlers[0]
4395        self.assertIsInstance(handler, logging.StreamHandler)
4396        self.assertEqual(handler.stream, stream)
4397
4398    def test_format(self):
4399        logging.basicConfig(format='%(asctime)s - %(message)s')
4400
4401        formatter = logging.root.handlers[0].formatter
4402        self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s')
4403
4404    def test_datefmt(self):
4405        logging.basicConfig(datefmt='bar')
4406
4407        formatter = logging.root.handlers[0].formatter
4408        self.assertEqual(formatter.datefmt, 'bar')
4409
4410    def test_style(self):
4411        logging.basicConfig(style='$')
4412
4413        formatter = logging.root.handlers[0].formatter
4414        self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
4415
4416    def test_level(self):
4417        old_level = logging.root.level
4418        self.addCleanup(logging.root.setLevel, old_level)
4419
4420        logging.basicConfig(level=57)
4421        self.assertEqual(logging.root.level, 57)
4422        # Test that second call has no effect
4423        logging.basicConfig(level=58)
4424        self.assertEqual(logging.root.level, 57)
4425
4426    def test_incompatible(self):
4427        assertRaises = self.assertRaises
4428        handlers = [logging.StreamHandler()]
4429        stream = sys.stderr
4430        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4431                                                      stream=stream)
4432        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4433                                                      handlers=handlers)
4434        assertRaises(ValueError, logging.basicConfig, stream=stream,
4435                                                      handlers=handlers)
4436        # Issue 23207: test for invalid kwargs
4437        assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
4438        # Should pop both filename and filemode even if filename is None
4439        logging.basicConfig(filename=None, filemode='a')
4440
4441    def test_handlers(self):
4442        handlers = [
4443            logging.StreamHandler(),
4444            logging.StreamHandler(sys.stdout),
4445            logging.StreamHandler(),
4446        ]
4447        f = logging.Formatter()
4448        handlers[2].setFormatter(f)
4449        logging.basicConfig(handlers=handlers)
4450        self.assertIs(handlers[0], logging.root.handlers[0])
4451        self.assertIs(handlers[1], logging.root.handlers[1])
4452        self.assertIs(handlers[2], logging.root.handlers[2])
4453        self.assertIsNotNone(handlers[0].formatter)
4454        self.assertIsNotNone(handlers[1].formatter)
4455        self.assertIs(handlers[2].formatter, f)
4456        self.assertIs(handlers[0].formatter, handlers[1].formatter)
4457
4458    def test_force(self):
4459        old_string_io = io.StringIO()
4460        new_string_io = io.StringIO()
4461        old_handlers = [logging.StreamHandler(old_string_io)]
4462        new_handlers = [logging.StreamHandler(new_string_io)]
4463        logging.basicConfig(level=logging.WARNING, handlers=old_handlers)
4464        logging.warning('warn')
4465        logging.info('info')
4466        logging.debug('debug')
4467        self.assertEqual(len(logging.root.handlers), 1)
4468        logging.basicConfig(level=logging.INFO, handlers=new_handlers,
4469                            force=True)
4470        logging.warning('warn')
4471        logging.info('info')
4472        logging.debug('debug')
4473        self.assertEqual(len(logging.root.handlers), 1)
4474        self.assertEqual(old_string_io.getvalue().strip(),
4475                         'WARNING:root:warn')
4476        self.assertEqual(new_string_io.getvalue().strip(),
4477                         'WARNING:root:warn\nINFO:root:info')
4478
4479    def _test_log(self, method, level=None):
4480        # logging.root has no handlers so basicConfig should be called
4481        called = []
4482
4483        old_basic_config = logging.basicConfig
4484        def my_basic_config(*a, **kw):
4485            old_basic_config()
4486            old_level = logging.root.level
4487            logging.root.setLevel(100)  # avoid having messages in stderr
4488            self.addCleanup(logging.root.setLevel, old_level)
4489            called.append((a, kw))
4490
4491        support.patch(self, logging, 'basicConfig', my_basic_config)
4492
4493        log_method = getattr(logging, method)
4494        if level is not None:
4495            log_method(level, "test me")
4496        else:
4497            log_method("test me")
4498
4499        # basicConfig was called with no arguments
4500        self.assertEqual(called, [((), {})])
4501
4502    def test_log(self):
4503        self._test_log('log', logging.WARNING)
4504
4505    def test_debug(self):
4506        self._test_log('debug')
4507
4508    def test_info(self):
4509        self._test_log('info')
4510
4511    def test_warning(self):
4512        self._test_log('warning')
4513
4514    def test_error(self):
4515        self._test_log('error')
4516
4517    def test_critical(self):
4518        self._test_log('critical')
4519
4520
4521class LoggerAdapterTest(unittest.TestCase):
4522    def setUp(self):
4523        super(LoggerAdapterTest, self).setUp()
4524        old_handler_list = logging._handlerList[:]
4525
4526        self.recording = RecordingHandler()
4527        self.logger = logging.root
4528        self.logger.addHandler(self.recording)
4529        self.addCleanup(self.logger.removeHandler, self.recording)
4530        self.addCleanup(self.recording.close)
4531
4532        def cleanup():
4533            logging._handlerList[:] = old_handler_list
4534
4535        self.addCleanup(cleanup)
4536        self.addCleanup(logging.shutdown)
4537        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
4538
4539    def test_exception(self):
4540        msg = 'testing exception: %r'
4541        exc = None
4542        try:
4543            1 / 0
4544        except ZeroDivisionError as e:
4545            exc = e
4546            self.adapter.exception(msg, self.recording)
4547
4548        self.assertEqual(len(self.recording.records), 1)
4549        record = self.recording.records[0]
4550        self.assertEqual(record.levelno, logging.ERROR)
4551        self.assertEqual(record.msg, msg)
4552        self.assertEqual(record.args, (self.recording,))
4553        self.assertEqual(record.exc_info,
4554                         (exc.__class__, exc, exc.__traceback__))
4555
4556    def test_exception_excinfo(self):
4557        try:
4558            1 / 0
4559        except ZeroDivisionError as e:
4560            exc = e
4561
4562        self.adapter.exception('exc_info test', exc_info=exc)
4563
4564        self.assertEqual(len(self.recording.records), 1)
4565        record = self.recording.records[0]
4566        self.assertEqual(record.exc_info,
4567                         (exc.__class__, exc, exc.__traceback__))
4568
4569    def test_critical(self):
4570        msg = 'critical test! %r'
4571        self.adapter.critical(msg, self.recording)
4572
4573        self.assertEqual(len(self.recording.records), 1)
4574        record = self.recording.records[0]
4575        self.assertEqual(record.levelno, logging.CRITICAL)
4576        self.assertEqual(record.msg, msg)
4577        self.assertEqual(record.args, (self.recording,))
4578
4579    def test_is_enabled_for(self):
4580        old_disable = self.adapter.logger.manager.disable
4581        self.adapter.logger.manager.disable = 33
4582        self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
4583                        old_disable)
4584        self.assertFalse(self.adapter.isEnabledFor(32))
4585
4586    def test_has_handlers(self):
4587        self.assertTrue(self.adapter.hasHandlers())
4588
4589        for handler in self.logger.handlers:
4590            self.logger.removeHandler(handler)
4591
4592        self.assertFalse(self.logger.hasHandlers())
4593        self.assertFalse(self.adapter.hasHandlers())
4594
4595    def test_nested(self):
4596        class Adapter(logging.LoggerAdapter):
4597            prefix = 'Adapter'
4598
4599            def process(self, msg, kwargs):
4600                return f"{self.prefix} {msg}", kwargs
4601
4602        msg = 'Adapters can be nested, yo.'
4603        adapter = Adapter(logger=self.logger, extra=None)
4604        adapter_adapter = Adapter(logger=adapter, extra=None)
4605        adapter_adapter.prefix = 'AdapterAdapter'
4606        self.assertEqual(repr(adapter), repr(adapter_adapter))
4607        adapter_adapter.log(logging.CRITICAL, msg, self.recording)
4608        self.assertEqual(len(self.recording.records), 1)
4609        record = self.recording.records[0]
4610        self.assertEqual(record.levelno, logging.CRITICAL)
4611        self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}")
4612        self.assertEqual(record.args, (self.recording,))
4613        orig_manager = adapter_adapter.manager
4614        self.assertIs(adapter.manager, orig_manager)
4615        self.assertIs(self.logger.manager, orig_manager)
4616        temp_manager = object()
4617        try:
4618            adapter_adapter.manager = temp_manager
4619            self.assertIs(adapter_adapter.manager, temp_manager)
4620            self.assertIs(adapter.manager, temp_manager)
4621            self.assertIs(self.logger.manager, temp_manager)
4622        finally:
4623            adapter_adapter.manager = orig_manager
4624        self.assertIs(adapter_adapter.manager, orig_manager)
4625        self.assertIs(adapter.manager, orig_manager)
4626        self.assertIs(self.logger.manager, orig_manager)
4627
4628
4629class LoggerTest(BaseTest):
4630
4631    def setUp(self):
4632        super(LoggerTest, self).setUp()
4633        self.recording = RecordingHandler()
4634        self.logger = logging.Logger(name='blah')
4635        self.logger.addHandler(self.recording)
4636        self.addCleanup(self.logger.removeHandler, self.recording)
4637        self.addCleanup(self.recording.close)
4638        self.addCleanup(logging.shutdown)
4639
4640    def test_set_invalid_level(self):
4641        self.assertRaises(TypeError, self.logger.setLevel, object())
4642
4643    def test_exception(self):
4644        msg = 'testing exception: %r'
4645        exc = None
4646        try:
4647            1 / 0
4648        except ZeroDivisionError as e:
4649            exc = e
4650            self.logger.exception(msg, self.recording)
4651
4652        self.assertEqual(len(self.recording.records), 1)
4653        record = self.recording.records[0]
4654        self.assertEqual(record.levelno, logging.ERROR)
4655        self.assertEqual(record.msg, msg)
4656        self.assertEqual(record.args, (self.recording,))
4657        self.assertEqual(record.exc_info,
4658                         (exc.__class__, exc, exc.__traceback__))
4659
4660    def test_log_invalid_level_with_raise(self):
4661        with support.swap_attr(logging, 'raiseExceptions', True):
4662            self.assertRaises(TypeError, self.logger.log, '10', 'test message')
4663
4664    def test_log_invalid_level_no_raise(self):
4665        with support.swap_attr(logging, 'raiseExceptions', False):
4666            self.logger.log('10', 'test message')  # no exception happens
4667
4668    def test_find_caller_with_stack_info(self):
4669        called = []
4670        support.patch(self, logging.traceback, 'print_stack',
4671                      lambda f, file: called.append(file.getvalue()))
4672
4673        self.logger.findCaller(stack_info=True)
4674
4675        self.assertEqual(len(called), 1)
4676        self.assertEqual('Stack (most recent call last):\n', called[0])
4677
4678    def test_find_caller_with_stacklevel(self):
4679        the_level = 1
4680
4681        def innermost():
4682            self.logger.warning('test', stacklevel=the_level)
4683
4684        def inner():
4685            innermost()
4686
4687        def outer():
4688            inner()
4689
4690        records = self.recording.records
4691        outer()
4692        self.assertEqual(records[-1].funcName, 'innermost')
4693        lineno = records[-1].lineno
4694        the_level += 1
4695        outer()
4696        self.assertEqual(records[-1].funcName, 'inner')
4697        self.assertGreater(records[-1].lineno, lineno)
4698        lineno = records[-1].lineno
4699        the_level += 1
4700        outer()
4701        self.assertEqual(records[-1].funcName, 'outer')
4702        self.assertGreater(records[-1].lineno, lineno)
4703        lineno = records[-1].lineno
4704        the_level += 1
4705        outer()
4706        self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel')
4707        self.assertGreater(records[-1].lineno, lineno)
4708
4709    def test_make_record_with_extra_overwrite(self):
4710        name = 'my record'
4711        level = 13
4712        fn = lno = msg = args = exc_info = func = sinfo = None
4713        rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
4714                                       exc_info, func, sinfo)
4715
4716        for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
4717            extra = {key: 'some value'}
4718            self.assertRaises(KeyError, self.logger.makeRecord, name, level,
4719                              fn, lno, msg, args, exc_info,
4720                              extra=extra, sinfo=sinfo)
4721
4722    def test_make_record_with_extra_no_overwrite(self):
4723        name = 'my record'
4724        level = 13
4725        fn = lno = msg = args = exc_info = func = sinfo = None
4726        extra = {'valid_key': 'some value'}
4727        result = self.logger.makeRecord(name, level, fn, lno, msg, args,
4728                                        exc_info, extra=extra, sinfo=sinfo)
4729        self.assertIn('valid_key', result.__dict__)
4730
4731    def test_has_handlers(self):
4732        self.assertTrue(self.logger.hasHandlers())
4733
4734        for handler in self.logger.handlers:
4735            self.logger.removeHandler(handler)
4736        self.assertFalse(self.logger.hasHandlers())
4737
4738    def test_has_handlers_no_propagate(self):
4739        child_logger = logging.getLogger('blah.child')
4740        child_logger.propagate = False
4741        self.assertFalse(child_logger.hasHandlers())
4742
4743    def test_is_enabled_for(self):
4744        old_disable = self.logger.manager.disable
4745        self.logger.manager.disable = 23
4746        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
4747        self.assertFalse(self.logger.isEnabledFor(22))
4748
4749    def test_is_enabled_for_disabled_logger(self):
4750        old_disabled = self.logger.disabled
4751        old_disable = self.logger.manager.disable
4752
4753        self.logger.disabled = True
4754        self.logger.manager.disable = 21
4755
4756        self.addCleanup(setattr, self.logger, 'disabled', old_disabled)
4757        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
4758
4759        self.assertFalse(self.logger.isEnabledFor(22))
4760
4761    def test_root_logger_aliases(self):
4762        root = logging.getLogger()
4763        self.assertIs(root, logging.root)
4764        self.assertIs(root, logging.getLogger(None))
4765        self.assertIs(root, logging.getLogger(''))
4766        self.assertIs(root, logging.getLogger('foo').root)
4767        self.assertIs(root, logging.getLogger('foo.bar').root)
4768        self.assertIs(root, logging.getLogger('foo').parent)
4769
4770        self.assertIsNot(root, logging.getLogger('\0'))
4771        self.assertIsNot(root, logging.getLogger('foo.bar').parent)
4772
4773    def test_invalid_names(self):
4774        self.assertRaises(TypeError, logging.getLogger, any)
4775        self.assertRaises(TypeError, logging.getLogger, b'foo')
4776
4777    def test_pickling(self):
4778        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4779            for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'):
4780                logger = logging.getLogger(name)
4781                s = pickle.dumps(logger, proto)
4782                unpickled = pickle.loads(s)
4783                self.assertIs(unpickled, logger)
4784
4785    def test_caching(self):
4786        root = self.root_logger
4787        logger1 = logging.getLogger("abc")
4788        logger2 = logging.getLogger("abc.def")
4789
4790        # Set root logger level and ensure cache is empty
4791        root.setLevel(logging.ERROR)
4792        self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR)
4793        self.assertEqual(logger2._cache, {})
4794
4795        # Ensure cache is populated and calls are consistent
4796        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
4797        self.assertFalse(logger2.isEnabledFor(logging.DEBUG))
4798        self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False})
4799        self.assertEqual(root._cache, {})
4800        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
4801
4802        # Ensure root cache gets populated
4803        self.assertEqual(root._cache, {})
4804        self.assertTrue(root.isEnabledFor(logging.ERROR))
4805        self.assertEqual(root._cache, {logging.ERROR: True})
4806
4807        # Set parent logger level and ensure caches are emptied
4808        logger1.setLevel(logging.CRITICAL)
4809        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
4810        self.assertEqual(logger2._cache, {})
4811
4812        # Ensure logger2 uses parent logger's effective level
4813        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
4814
4815        # Set level to NOTSET and ensure caches are empty
4816        logger2.setLevel(logging.NOTSET)
4817        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
4818        self.assertEqual(logger2._cache, {})
4819        self.assertEqual(logger1._cache, {})
4820        self.assertEqual(root._cache, {})
4821
4822        # Verify logger2 follows parent and not root
4823        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
4824        self.assertTrue(logger2.isEnabledFor(logging.CRITICAL))
4825        self.assertFalse(logger1.isEnabledFor(logging.ERROR))
4826        self.assertTrue(logger1.isEnabledFor(logging.CRITICAL))
4827        self.assertTrue(root.isEnabledFor(logging.ERROR))
4828
4829        # Disable logging in manager and ensure caches are clear
4830        logging.disable()
4831        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
4832        self.assertEqual(logger2._cache, {})
4833        self.assertEqual(logger1._cache, {})
4834        self.assertEqual(root._cache, {})
4835
4836        # Ensure no loggers are enabled
4837        self.assertFalse(logger1.isEnabledFor(logging.CRITICAL))
4838        self.assertFalse(logger2.isEnabledFor(logging.CRITICAL))
4839        self.assertFalse(root.isEnabledFor(logging.CRITICAL))
4840
4841
4842class BaseFileTest(BaseTest):
4843    "Base class for handler tests that write log files"
4844
4845    def setUp(self):
4846        BaseTest.setUp(self)
4847        fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
4848        os.close(fd)
4849        self.rmfiles = []
4850
4851    def tearDown(self):
4852        for fn in self.rmfiles:
4853            os.unlink(fn)
4854        if os.path.exists(self.fn):
4855            os.unlink(self.fn)
4856        BaseTest.tearDown(self)
4857
4858    def assertLogFile(self, filename):
4859        "Assert a log file is there and register it for deletion"
4860        self.assertTrue(os.path.exists(filename),
4861                        msg="Log file %r does not exist" % filename)
4862        self.rmfiles.append(filename)
4863
4864
4865class FileHandlerTest(BaseFileTest):
4866    def test_delay(self):
4867        os.unlink(self.fn)
4868        fh = logging.FileHandler(self.fn, delay=True)
4869        self.assertIsNone(fh.stream)
4870        self.assertFalse(os.path.exists(self.fn))
4871        fh.handle(logging.makeLogRecord({}))
4872        self.assertIsNotNone(fh.stream)
4873        self.assertTrue(os.path.exists(self.fn))
4874        fh.close()
4875
4876class RotatingFileHandlerTest(BaseFileTest):
4877    def next_rec(self):
4878        return logging.LogRecord('n', logging.DEBUG, 'p', 1,
4879                                 self.next_message(), None, None, None)
4880
4881    def test_should_not_rollover(self):
4882        # If maxbytes is zero rollover never occurs
4883        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0)
4884        self.assertFalse(rh.shouldRollover(None))
4885        rh.close()
4886
4887    def test_should_rollover(self):
4888        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1)
4889        self.assertTrue(rh.shouldRollover(self.next_rec()))
4890        rh.close()
4891
4892    def test_file_created(self):
4893        # checks that the file is created and assumes it was created
4894        # by us
4895        rh = logging.handlers.RotatingFileHandler(self.fn)
4896        rh.emit(self.next_rec())
4897        self.assertLogFile(self.fn)
4898        rh.close()
4899
4900    def test_rollover_filenames(self):
4901        def namer(name):
4902            return name + ".test"
4903        rh = logging.handlers.RotatingFileHandler(
4904            self.fn, backupCount=2, maxBytes=1)
4905        rh.namer = namer
4906        rh.emit(self.next_rec())
4907        self.assertLogFile(self.fn)
4908        rh.emit(self.next_rec())
4909        self.assertLogFile(namer(self.fn + ".1"))
4910        rh.emit(self.next_rec())
4911        self.assertLogFile(namer(self.fn + ".2"))
4912        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4913        rh.close()
4914
4915    @support.requires_zlib
4916    def test_rotator(self):
4917        def namer(name):
4918            return name + ".gz"
4919
4920        def rotator(source, dest):
4921            with open(source, "rb") as sf:
4922                data = sf.read()
4923                compressed = zlib.compress(data, 9)
4924                with open(dest, "wb") as df:
4925                    df.write(compressed)
4926            os.remove(source)
4927
4928        rh = logging.handlers.RotatingFileHandler(
4929            self.fn, backupCount=2, maxBytes=1)
4930        rh.rotator = rotator
4931        rh.namer = namer
4932        m1 = self.next_rec()
4933        rh.emit(m1)
4934        self.assertLogFile(self.fn)
4935        m2 = self.next_rec()
4936        rh.emit(m2)
4937        fn = namer(self.fn + ".1")
4938        self.assertLogFile(fn)
4939        newline = os.linesep
4940        with open(fn, "rb") as f:
4941            compressed = f.read()
4942            data = zlib.decompress(compressed)
4943            self.assertEqual(data.decode("ascii"), m1.msg + newline)
4944        rh.emit(self.next_rec())
4945        fn = namer(self.fn + ".2")
4946        self.assertLogFile(fn)
4947        with open(fn, "rb") as f:
4948            compressed = f.read()
4949            data = zlib.decompress(compressed)
4950            self.assertEqual(data.decode("ascii"), m1.msg + newline)
4951        rh.emit(self.next_rec())
4952        fn = namer(self.fn + ".2")
4953        with open(fn, "rb") as f:
4954            compressed = f.read()
4955            data = zlib.decompress(compressed)
4956            self.assertEqual(data.decode("ascii"), m2.msg + newline)
4957        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4958        rh.close()
4959
4960class TimedRotatingFileHandlerTest(BaseFileTest):
4961    # other test methods added below
4962    def test_rollover(self):
4963        fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S',
4964                                                       backupCount=1)
4965        fmt = logging.Formatter('%(asctime)s %(message)s')
4966        fh.setFormatter(fmt)
4967        r1 = logging.makeLogRecord({'msg': 'testing - initial'})
4968        fh.emit(r1)
4969        self.assertLogFile(self.fn)
4970        time.sleep(1.1)    # a little over a second ...
4971        r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
4972        fh.emit(r2)
4973        fh.close()
4974        # At this point, we should have a recent rotated file which we
4975        # can test for the existence of. However, in practice, on some
4976        # machines which run really slowly, we don't know how far back
4977        # in time to go to look for the log file. So, we go back a fair
4978        # bit, and stop as soon as we see a rotated file. In theory this
4979        # could of course still fail, but the chances are lower.
4980        found = False
4981        now = datetime.datetime.now()
4982        GO_BACK = 5 * 60 # seconds
4983        for secs in range(GO_BACK):
4984            prev = now - datetime.timedelta(seconds=secs)
4985            fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
4986            found = os.path.exists(fn)
4987            if found:
4988                self.rmfiles.append(fn)
4989                break
4990        msg = 'No rotated files found, went back %d seconds' % GO_BACK
4991        if not found:
4992            # print additional diagnostics
4993            dn, fn = os.path.split(self.fn)
4994            files = [f for f in os.listdir(dn) if f.startswith(fn)]
4995            print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
4996            print('The only matching files are: %s' % files, file=sys.stderr)
4997            for f in files:
4998                print('Contents of %s:' % f)
4999                path = os.path.join(dn, f)
5000                with open(path, 'r') as tf:
5001                    print(tf.read())
5002        self.assertTrue(found, msg=msg)
5003
5004    def test_invalid(self):
5005        assertRaises = self.assertRaises
5006        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5007                     self.fn, 'X', delay=True)
5008        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5009                     self.fn, 'W', delay=True)
5010        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5011                     self.fn, 'W7', delay=True)
5012
5013    def test_compute_rollover_daily_attime(self):
5014        currentTime = 0
5015        atTime = datetime.time(12, 0, 0)
5016        rh = logging.handlers.TimedRotatingFileHandler(
5017            self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True,
5018            atTime=atTime)
5019        try:
5020            actual = rh.computeRollover(currentTime)
5021            self.assertEqual(actual, currentTime + 12 * 60 * 60)
5022
5023            actual = rh.computeRollover(currentTime + 13 * 60 * 60)
5024            self.assertEqual(actual, currentTime + 36 * 60 * 60)
5025        finally:
5026            rh.close()
5027
5028    #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.')
5029    def test_compute_rollover_weekly_attime(self):
5030        currentTime = int(time.time())
5031        today = currentTime - currentTime % 86400
5032
5033        atTime = datetime.time(12, 0, 0)
5034
5035        wday = time.gmtime(today).tm_wday
5036        for day in range(7):
5037            rh = logging.handlers.TimedRotatingFileHandler(
5038                self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True,
5039                atTime=atTime)
5040            try:
5041                if wday > day:
5042                    # The rollover day has already passed this week, so we
5043                    # go over into next week
5044                    expected = (7 - wday + day)
5045                else:
5046                    expected = (day - wday)
5047                # At this point expected is in days from now, convert to seconds
5048                expected *= 24 * 60 * 60
5049                # Add in the rollover time
5050                expected += 12 * 60 * 60
5051                # Add in adjustment for today
5052                expected += today
5053                actual = rh.computeRollover(today)
5054                if actual != expected:
5055                    print('failed in timezone: %d' % time.timezone)
5056                    print('local vars: %s' % locals())
5057                self.assertEqual(actual, expected)
5058                if day == wday:
5059                    # goes into following week
5060                    expected += 7 * 24 * 60 * 60
5061                actual = rh.computeRollover(today + 13 * 60 * 60)
5062                if actual != expected:
5063                    print('failed in timezone: %d' % time.timezone)
5064                    print('local vars: %s' % locals())
5065                self.assertEqual(actual, expected)
5066            finally:
5067                rh.close()
5068
5069
5070def secs(**kw):
5071    return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
5072
5073for when, exp in (('S', 1),
5074                  ('M', 60),
5075                  ('H', 60 * 60),
5076                  ('D', 60 * 60 * 24),
5077                  ('MIDNIGHT', 60 * 60 * 24),
5078                  # current time (epoch start) is a Thursday, W0 means Monday
5079                  ('W0', secs(days=4, hours=24)),
5080                 ):
5081    def test_compute_rollover(self, when=when, exp=exp):
5082        rh = logging.handlers.TimedRotatingFileHandler(
5083            self.fn, when=when, interval=1, backupCount=0, utc=True)
5084        currentTime = 0.0
5085        actual = rh.computeRollover(currentTime)
5086        if exp != actual:
5087            # Failures occur on some systems for MIDNIGHT and W0.
5088            # Print detailed calculation for MIDNIGHT so we can try to see
5089            # what's going on
5090            if when == 'MIDNIGHT':
5091                try:
5092                    if rh.utc:
5093                        t = time.gmtime(currentTime)
5094                    else:
5095                        t = time.localtime(currentTime)
5096                    currentHour = t[3]
5097                    currentMinute = t[4]
5098                    currentSecond = t[5]
5099                    # r is the number of seconds left between now and midnight
5100                    r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
5101                                                       currentMinute) * 60 +
5102                            currentSecond)
5103                    result = currentTime + r
5104                    print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
5105                    print('currentHour: %s' % currentHour, file=sys.stderr)
5106                    print('currentMinute: %s' % currentMinute, file=sys.stderr)
5107                    print('currentSecond: %s' % currentSecond, file=sys.stderr)
5108                    print('r: %s' % r, file=sys.stderr)
5109                    print('result: %s' % result, file=sys.stderr)
5110                except Exception:
5111                    print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr)
5112        self.assertEqual(exp, actual)
5113        rh.close()
5114    setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
5115
5116
5117@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
5118class NTEventLogHandlerTest(BaseTest):
5119    def test_basic(self):
5120        logtype = 'Application'
5121        elh = win32evtlog.OpenEventLog(None, logtype)
5122        num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
5123
5124        try:
5125            h = logging.handlers.NTEventLogHandler('test_logging')
5126        except pywintypes.error as e:
5127            if e.winerror == 5:  # access denied
5128                raise unittest.SkipTest('Insufficient privileges to run test')
5129            raise
5130
5131        r = logging.makeLogRecord({'msg': 'Test Log Message'})
5132        h.handle(r)
5133        h.close()
5134        # Now see if the event is recorded
5135        self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
5136        flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
5137                win32evtlog.EVENTLOG_SEQUENTIAL_READ
5138        found = False
5139        GO_BACK = 100
5140        events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
5141        for e in events:
5142            if e.SourceName != 'test_logging':
5143                continue
5144            msg = win32evtlogutil.SafeFormatMessage(e, logtype)
5145            if msg != 'Test Log Message\r\n':
5146                continue
5147            found = True
5148            break
5149        msg = 'Record not found in event log, went back %d records' % GO_BACK
5150        self.assertTrue(found, msg=msg)
5151
5152
5153class MiscTestCase(unittest.TestCase):
5154    def test__all__(self):
5155        blacklist = {'logThreads', 'logMultiprocessing',
5156                     'logProcesses', 'currentframe',
5157                     'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle',
5158                     'Filterer', 'PlaceHolder', 'Manager', 'RootLogger',
5159                     'root', 'threading'}
5160        support.check__all__(self, logging, blacklist=blacklist)
5161
5162
5163# Set the locale to the platform-dependent default.  I have no idea
5164# why the test does this, but in any case we save the current locale
5165# first and restore it at the end.
5166@support.run_with_locale('LC_ALL', '')
5167def test_main():
5168    tests = [
5169        BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest,
5170        HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest,
5171        DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest,
5172        ConfigDictTest, ManagerTest, FormatterTest, BufferingFormatterTest,
5173        StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest,
5174        QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
5175        LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest,
5176        RotatingFileHandlerTest,  LastResortTest, LogRecordTest,
5177        ExceptionTest, SysLogHandlerTest, IPv6SysLogHandlerTest, HTTPHandlerTest,
5178        NTEventLogHandlerTest, TimedRotatingFileHandlerTest,
5179        UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest,
5180        MiscTestCase
5181    ]
5182    if hasattr(logging.handlers, 'QueueListener'):
5183        tests.append(QueueListenerTest)
5184    support.run_unittest(*tests)
5185
5186if __name__ == "__main__":
5187    test_main()
5188