• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2001-2021 by Vinay Sajip. All Rights Reserved.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose and without fee is hereby granted,
5# provided that the above copyright notice appear in all copies and that
6# both that copyright notice and this permission notice appear in
7# supporting documentation, and that the name of Vinay Sajip
8# not be used in advertising or publicity pertaining to distribution
9# of the software without specific, written prior permission.
10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
17"""Test harness for the logging module. Run all tests.
18
19Copyright (C) 2001-2021 Vinay Sajip. All Rights Reserved.
20"""
21
22import logging
23import logging.handlers
24import logging.config
25
26import codecs
27import configparser
28import copy
29import datetime
30import pathlib
31import pickle
32import io
33import gc
34import json
35import os
36import queue
37import random
38import re
39import shutil
40import socket
41import struct
42import sys
43import tempfile
44from test.support.script_helper import assert_python_ok, assert_python_failure
45from test import support
46from test.support import os_helper
47from test.support import socket_helper
48from test.support import threading_helper
49from test.support import warnings_helper
50from test.support.logging_helper import TestHandler
51import textwrap
52import threading
53import time
54import unittest
55import warnings
56import weakref
57
58from http.server import HTTPServer, BaseHTTPRequestHandler
59from urllib.parse import urlparse, parse_qs
60from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
61                          ThreadingTCPServer, StreamRequestHandler)
62
63with warnings.catch_warnings():
64    warnings.simplefilter('ignore', DeprecationWarning)
65    import asyncore
66    import smtpd
67
68try:
69    import win32evtlog, win32evtlogutil, pywintypes
70except ImportError:
71    win32evtlog = win32evtlogutil = pywintypes = None
72
73try:
74    import zlib
75except ImportError:
76    pass
77
78class BaseTest(unittest.TestCase):
79
80    """Base class for logging tests."""
81
82    log_format = "%(name)s -> %(levelname)s: %(message)s"
83    expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
84    message_num = 0
85
86    def setUp(self):
87        """Setup the default logging stream to an internal StringIO instance,
88        so that we can examine log output as we want."""
89        self._threading_key = threading_helper.threading_setup()
90
91        logger_dict = logging.getLogger().manager.loggerDict
92        logging._acquireLock()
93        try:
94            self.saved_handlers = logging._handlers.copy()
95            self.saved_handler_list = logging._handlerList[:]
96            self.saved_loggers = saved_loggers = logger_dict.copy()
97            self.saved_name_to_level = logging._nameToLevel.copy()
98            self.saved_level_to_name = logging._levelToName.copy()
99            self.logger_states = logger_states = {}
100            for name in saved_loggers:
101                logger_states[name] = getattr(saved_loggers[name],
102                                              'disabled', None)
103        finally:
104            logging._releaseLock()
105
106        # Set two unused loggers
107        self.logger1 = logging.getLogger("\xab\xd7\xbb")
108        self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
109
110        self.root_logger = logging.getLogger("")
111        self.original_logging_level = self.root_logger.getEffectiveLevel()
112
113        self.stream = io.StringIO()
114        self.root_logger.setLevel(logging.DEBUG)
115        self.root_hdlr = logging.StreamHandler(self.stream)
116        self.root_formatter = logging.Formatter(self.log_format)
117        self.root_hdlr.setFormatter(self.root_formatter)
118        if self.logger1.hasHandlers():
119            hlist = self.logger1.handlers + self.root_logger.handlers
120            raise AssertionError('Unexpected handlers: %s' % hlist)
121        if self.logger2.hasHandlers():
122            hlist = self.logger2.handlers + self.root_logger.handlers
123            raise AssertionError('Unexpected handlers: %s' % hlist)
124        self.root_logger.addHandler(self.root_hdlr)
125        self.assertTrue(self.logger1.hasHandlers())
126        self.assertTrue(self.logger2.hasHandlers())
127
128    def tearDown(self):
129        """Remove our logging stream, and restore the original logging
130        level."""
131        self.stream.close()
132        self.root_logger.removeHandler(self.root_hdlr)
133        while self.root_logger.handlers:
134            h = self.root_logger.handlers[0]
135            self.root_logger.removeHandler(h)
136            h.close()
137        self.root_logger.setLevel(self.original_logging_level)
138        logging._acquireLock()
139        try:
140            logging._levelToName.clear()
141            logging._levelToName.update(self.saved_level_to_name)
142            logging._nameToLevel.clear()
143            logging._nameToLevel.update(self.saved_name_to_level)
144            logging._handlers.clear()
145            logging._handlers.update(self.saved_handlers)
146            logging._handlerList[:] = self.saved_handler_list
147            manager = logging.getLogger().manager
148            manager.disable = 0
149            loggerDict = manager.loggerDict
150            loggerDict.clear()
151            loggerDict.update(self.saved_loggers)
152            logger_states = self.logger_states
153            for name in self.logger_states:
154                if logger_states[name] is not None:
155                    self.saved_loggers[name].disabled = logger_states[name]
156        finally:
157            logging._releaseLock()
158
159        self.doCleanups()
160        threading_helper.threading_cleanup(*self._threading_key)
161
162    def assert_log_lines(self, expected_values, stream=None, pat=None):
163        """Match the collected log lines against the regular expression
164        self.expected_log_pat, and compare the extracted group values to
165        the expected_values list of tuples."""
166        stream = stream or self.stream
167        pat = re.compile(pat or self.expected_log_pat)
168        actual_lines = stream.getvalue().splitlines()
169        self.assertEqual(len(actual_lines), len(expected_values))
170        for actual, expected in zip(actual_lines, expected_values):
171            match = pat.search(actual)
172            if not match:
173                self.fail("Log line does not match expected pattern:\n" +
174                            actual)
175            self.assertEqual(tuple(match.groups()), expected)
176        s = stream.read()
177        if s:
178            self.fail("Remaining output at end of log stream:\n" + s)
179
180    def next_message(self):
181        """Generate a message consisting solely of an auto-incrementing
182        integer."""
183        self.message_num += 1
184        return "%d" % self.message_num
185
186
187class BuiltinLevelsTest(BaseTest):
188    """Test builtin levels and their inheritance."""
189
190    def test_flat(self):
191        # Logging levels in a flat logger namespace.
192        m = self.next_message
193
194        ERR = logging.getLogger("ERR")
195        ERR.setLevel(logging.ERROR)
196        INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
197        INF.setLevel(logging.INFO)
198        DEB = logging.getLogger("DEB")
199        DEB.setLevel(logging.DEBUG)
200
201        # These should log.
202        ERR.log(logging.CRITICAL, m())
203        ERR.error(m())
204
205        INF.log(logging.CRITICAL, m())
206        INF.error(m())
207        INF.warning(m())
208        INF.info(m())
209
210        DEB.log(logging.CRITICAL, m())
211        DEB.error(m())
212        DEB.warning(m())
213        DEB.info(m())
214        DEB.debug(m())
215
216        # These should not log.
217        ERR.warning(m())
218        ERR.info(m())
219        ERR.debug(m())
220
221        INF.debug(m())
222
223        self.assert_log_lines([
224            ('ERR', 'CRITICAL', '1'),
225            ('ERR', 'ERROR', '2'),
226            ('INF', 'CRITICAL', '3'),
227            ('INF', 'ERROR', '4'),
228            ('INF', 'WARNING', '5'),
229            ('INF', 'INFO', '6'),
230            ('DEB', 'CRITICAL', '7'),
231            ('DEB', 'ERROR', '8'),
232            ('DEB', 'WARNING', '9'),
233            ('DEB', 'INFO', '10'),
234            ('DEB', 'DEBUG', '11'),
235        ])
236
237    def test_nested_explicit(self):
238        # Logging levels in a nested namespace, all explicitly set.
239        m = self.next_message
240
241        INF = logging.getLogger("INF")
242        INF.setLevel(logging.INFO)
243        INF_ERR  = logging.getLogger("INF.ERR")
244        INF_ERR.setLevel(logging.ERROR)
245
246        # These should log.
247        INF_ERR.log(logging.CRITICAL, m())
248        INF_ERR.error(m())
249
250        # These should not log.
251        INF_ERR.warning(m())
252        INF_ERR.info(m())
253        INF_ERR.debug(m())
254
255        self.assert_log_lines([
256            ('INF.ERR', 'CRITICAL', '1'),
257            ('INF.ERR', 'ERROR', '2'),
258        ])
259
260    def test_nested_inherited(self):
261        # Logging levels in a nested namespace, inherited from parent loggers.
262        m = self.next_message
263
264        INF = logging.getLogger("INF")
265        INF.setLevel(logging.INFO)
266        INF_ERR  = logging.getLogger("INF.ERR")
267        INF_ERR.setLevel(logging.ERROR)
268        INF_UNDEF = logging.getLogger("INF.UNDEF")
269        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
270        UNDEF = logging.getLogger("UNDEF")
271
272        # These should log.
273        INF_UNDEF.log(logging.CRITICAL, m())
274        INF_UNDEF.error(m())
275        INF_UNDEF.warning(m())
276        INF_UNDEF.info(m())
277        INF_ERR_UNDEF.log(logging.CRITICAL, m())
278        INF_ERR_UNDEF.error(m())
279
280        # These should not log.
281        INF_UNDEF.debug(m())
282        INF_ERR_UNDEF.warning(m())
283        INF_ERR_UNDEF.info(m())
284        INF_ERR_UNDEF.debug(m())
285
286        self.assert_log_lines([
287            ('INF.UNDEF', 'CRITICAL', '1'),
288            ('INF.UNDEF', 'ERROR', '2'),
289            ('INF.UNDEF', 'WARNING', '3'),
290            ('INF.UNDEF', 'INFO', '4'),
291            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
292            ('INF.ERR.UNDEF', 'ERROR', '6'),
293        ])
294
295    def test_nested_with_virtual_parent(self):
296        # Logging levels when some parent does not exist yet.
297        m = self.next_message
298
299        INF = logging.getLogger("INF")
300        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
301        CHILD = logging.getLogger("INF.BADPARENT")
302        INF.setLevel(logging.INFO)
303
304        # These should log.
305        GRANDCHILD.log(logging.FATAL, m())
306        GRANDCHILD.info(m())
307        CHILD.log(logging.FATAL, m())
308        CHILD.info(m())
309
310        # These should not log.
311        GRANDCHILD.debug(m())
312        CHILD.debug(m())
313
314        self.assert_log_lines([
315            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
316            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
317            ('INF.BADPARENT', 'CRITICAL', '3'),
318            ('INF.BADPARENT', 'INFO', '4'),
319        ])
320
321    def test_regression_22386(self):
322        """See issue #22386 for more information."""
323        self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
324        self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
325
326    def test_issue27935(self):
327        fatal = logging.getLevelName('FATAL')
328        self.assertEqual(fatal, logging.FATAL)
329
330    def test_regression_29220(self):
331        """See issue #29220 for more information."""
332        logging.addLevelName(logging.INFO, '')
333        self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
334        self.assertEqual(logging.getLevelName(logging.INFO), '')
335        self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET')
336        self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET)
337
338class BasicFilterTest(BaseTest):
339
340    """Test the bundled Filter class."""
341
342    def test_filter(self):
343        # Only messages satisfying the specified criteria pass through the
344        #  filter.
345        filter_ = logging.Filter("spam.eggs")
346        handler = self.root_logger.handlers[0]
347        try:
348            handler.addFilter(filter_)
349            spam = logging.getLogger("spam")
350            spam_eggs = logging.getLogger("spam.eggs")
351            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
352            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
353
354            spam.info(self.next_message())
355            spam_eggs.info(self.next_message())  # Good.
356            spam_eggs_fish.info(self.next_message())  # Good.
357            spam_bakedbeans.info(self.next_message())
358
359            self.assert_log_lines([
360                ('spam.eggs', 'INFO', '2'),
361                ('spam.eggs.fish', 'INFO', '3'),
362            ])
363        finally:
364            handler.removeFilter(filter_)
365
366    def test_callable_filter(self):
367        # Only messages satisfying the specified criteria pass through the
368        #  filter.
369
370        def filterfunc(record):
371            parts = record.name.split('.')
372            prefix = '.'.join(parts[:2])
373            return prefix == 'spam.eggs'
374
375        handler = self.root_logger.handlers[0]
376        try:
377            handler.addFilter(filterfunc)
378            spam = logging.getLogger("spam")
379            spam_eggs = logging.getLogger("spam.eggs")
380            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
381            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
382
383            spam.info(self.next_message())
384            spam_eggs.info(self.next_message())  # Good.
385            spam_eggs_fish.info(self.next_message())  # Good.
386            spam_bakedbeans.info(self.next_message())
387
388            self.assert_log_lines([
389                ('spam.eggs', 'INFO', '2'),
390                ('spam.eggs.fish', 'INFO', '3'),
391            ])
392        finally:
393            handler.removeFilter(filterfunc)
394
395    def test_empty_filter(self):
396        f = logging.Filter()
397        r = logging.makeLogRecord({'name': 'spam.eggs'})
398        self.assertTrue(f.filter(r))
399
400#
401#   First, we define our levels. There can be as many as you want - the only
402#     limitations are that they should be integers, the lowest should be > 0 and
403#   larger values mean less information being logged. If you need specific
404#   level values which do not fit into these limitations, you can use a
405#   mapping dictionary to convert between your application levels and the
406#   logging system.
407#
408SILENT      = 120
409TACITURN    = 119
410TERSE       = 118
411EFFUSIVE    = 117
412SOCIABLE    = 116
413VERBOSE     = 115
414TALKATIVE   = 114
415GARRULOUS   = 113
416CHATTERBOX  = 112
417BORING      = 111
418
419LEVEL_RANGE = range(BORING, SILENT + 1)
420
421#
422#   Next, we define names for our levels. You don't need to do this - in which
423#   case the system will use "Level n" to denote the text for the level.
424#
425my_logging_levels = {
426    SILENT      : 'Silent',
427    TACITURN    : 'Taciturn',
428    TERSE       : 'Terse',
429    EFFUSIVE    : 'Effusive',
430    SOCIABLE    : 'Sociable',
431    VERBOSE     : 'Verbose',
432    TALKATIVE   : 'Talkative',
433    GARRULOUS   : 'Garrulous',
434    CHATTERBOX  : 'Chatterbox',
435    BORING      : 'Boring',
436}
437
438class GarrulousFilter(logging.Filter):
439
440    """A filter which blocks garrulous messages."""
441
442    def filter(self, record):
443        return record.levelno != GARRULOUS
444
445class VerySpecificFilter(logging.Filter):
446
447    """A filter which blocks sociable and taciturn messages."""
448
449    def filter(self, record):
450        return record.levelno not in [SOCIABLE, TACITURN]
451
452
453class CustomLevelsAndFiltersTest(BaseTest):
454
455    """Test various filtering possibilities with custom logging levels."""
456
457    # Skip the logger name group.
458    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
459
460    def setUp(self):
461        BaseTest.setUp(self)
462        for k, v in my_logging_levels.items():
463            logging.addLevelName(k, v)
464
465    def log_at_all_levels(self, logger):
466        for lvl in LEVEL_RANGE:
467            logger.log(lvl, self.next_message())
468
469    def test_logger_filter(self):
470        # Filter at logger level.
471        self.root_logger.setLevel(VERBOSE)
472        # Levels >= 'Verbose' are good.
473        self.log_at_all_levels(self.root_logger)
474        self.assert_log_lines([
475            ('Verbose', '5'),
476            ('Sociable', '6'),
477            ('Effusive', '7'),
478            ('Terse', '8'),
479            ('Taciturn', '9'),
480            ('Silent', '10'),
481        ])
482
483    def test_handler_filter(self):
484        # Filter at handler level.
485        self.root_logger.handlers[0].setLevel(SOCIABLE)
486        try:
487            # Levels >= 'Sociable' are good.
488            self.log_at_all_levels(self.root_logger)
489            self.assert_log_lines([
490                ('Sociable', '6'),
491                ('Effusive', '7'),
492                ('Terse', '8'),
493                ('Taciturn', '9'),
494                ('Silent', '10'),
495            ])
496        finally:
497            self.root_logger.handlers[0].setLevel(logging.NOTSET)
498
499    def test_specific_filters(self):
500        # Set a specific filter object on the handler, and then add another
501        #  filter object on the logger itself.
502        handler = self.root_logger.handlers[0]
503        specific_filter = None
504        garr = GarrulousFilter()
505        handler.addFilter(garr)
506        try:
507            self.log_at_all_levels(self.root_logger)
508            first_lines = [
509                # Notice how 'Garrulous' is missing
510                ('Boring', '1'),
511                ('Chatterbox', '2'),
512                ('Talkative', '4'),
513                ('Verbose', '5'),
514                ('Sociable', '6'),
515                ('Effusive', '7'),
516                ('Terse', '8'),
517                ('Taciturn', '9'),
518                ('Silent', '10'),
519            ]
520            self.assert_log_lines(first_lines)
521
522            specific_filter = VerySpecificFilter()
523            self.root_logger.addFilter(specific_filter)
524            self.log_at_all_levels(self.root_logger)
525            self.assert_log_lines(first_lines + [
526                # Not only 'Garrulous' is still missing, but also 'Sociable'
527                # and 'Taciturn'
528                ('Boring', '11'),
529                ('Chatterbox', '12'),
530                ('Talkative', '14'),
531                ('Verbose', '15'),
532                ('Effusive', '17'),
533                ('Terse', '18'),
534                ('Silent', '20'),
535        ])
536        finally:
537            if specific_filter:
538                self.root_logger.removeFilter(specific_filter)
539            handler.removeFilter(garr)
540
541
542class HandlerTest(BaseTest):
543    def test_name(self):
544        h = logging.Handler()
545        h.name = 'generic'
546        self.assertEqual(h.name, 'generic')
547        h.name = 'anothergeneric'
548        self.assertEqual(h.name, 'anothergeneric')
549        self.assertRaises(NotImplementedError, h.emit, None)
550
551    def test_builtin_handlers(self):
552        # We can't actually *use* too many handlers in the tests,
553        # but we can try instantiating them with various options
554        if sys.platform in ('linux', 'darwin'):
555            for existing in (True, False):
556                fd, fn = tempfile.mkstemp()
557                os.close(fd)
558                if not existing:
559                    os.unlink(fn)
560                h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=True)
561                if existing:
562                    dev, ino = h.dev, h.ino
563                    self.assertEqual(dev, -1)
564                    self.assertEqual(ino, -1)
565                    r = logging.makeLogRecord({'msg': 'Test'})
566                    h.handle(r)
567                    # Now remove the file.
568                    os.unlink(fn)
569                    self.assertFalse(os.path.exists(fn))
570                    # The next call should recreate the file.
571                    h.handle(r)
572                    self.assertTrue(os.path.exists(fn))
573                else:
574                    self.assertEqual(h.dev, -1)
575                    self.assertEqual(h.ino, -1)
576                h.close()
577                if existing:
578                    os.unlink(fn)
579            if sys.platform == 'darwin':
580                sockname = '/var/run/syslog'
581            else:
582                sockname = '/dev/log'
583            try:
584                h = logging.handlers.SysLogHandler(sockname)
585                self.assertEqual(h.facility, h.LOG_USER)
586                self.assertTrue(h.unixsocket)
587                h.close()
588            except OSError: # syslogd might not be available
589                pass
590        for method in ('GET', 'POST', 'PUT'):
591            if method == 'PUT':
592                self.assertRaises(ValueError, logging.handlers.HTTPHandler,
593                                  'localhost', '/log', method)
594            else:
595                h = logging.handlers.HTTPHandler('localhost', '/log', method)
596                h.close()
597        h = logging.handlers.BufferingHandler(0)
598        r = logging.makeLogRecord({})
599        self.assertTrue(h.shouldFlush(r))
600        h.close()
601        h = logging.handlers.BufferingHandler(1)
602        self.assertFalse(h.shouldFlush(r))
603        h.close()
604
605    def test_path_objects(self):
606        """
607        Test that Path objects are accepted as filename arguments to handlers.
608
609        See Issue #27493.
610        """
611        fd, fn = tempfile.mkstemp()
612        os.close(fd)
613        os.unlink(fn)
614        pfn = pathlib.Path(fn)
615        cases = (
616                    (logging.FileHandler, (pfn, 'w')),
617                    (logging.handlers.RotatingFileHandler, (pfn, 'a')),
618                    (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')),
619                )
620        if sys.platform in ('linux', 'darwin'):
621            cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
622        for cls, args in cases:
623            h = cls(*args, encoding="utf-8")
624            self.assertTrue(os.path.exists(fn))
625            h.close()
626            os.unlink(fn)
627
628    @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
629    def test_race(self):
630        # Issue #14632 refers.
631        def remove_loop(fname, tries):
632            for _ in range(tries):
633                try:
634                    os.unlink(fname)
635                    self.deletion_time = time.time()
636                except OSError:
637                    pass
638                time.sleep(0.004 * random.randint(0, 4))
639
640        del_count = 500
641        log_count = 500
642
643        self.handle_time = None
644        self.deletion_time = None
645
646        for delay in (False, True):
647            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
648            os.close(fd)
649            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
650            remover.daemon = True
651            remover.start()
652            h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=delay)
653            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
654            h.setFormatter(f)
655            try:
656                for _ in range(log_count):
657                    time.sleep(0.005)
658                    r = logging.makeLogRecord({'msg': 'testing' })
659                    try:
660                        self.handle_time = time.time()
661                        h.handle(r)
662                    except Exception:
663                        print('Deleted at %s, '
664                              'opened at %s' % (self.deletion_time,
665                                                self.handle_time))
666                        raise
667            finally:
668                remover.join()
669                h.close()
670                if os.path.exists(fn):
671                    os.unlink(fn)
672
673    # The implementation relies on os.register_at_fork existing, but we test
674    # based on os.fork existing because that is what users and this test use.
675    # This helps ensure that when fork exists (the important concept) that the
676    # register_at_fork mechanism is also present and used.
677    @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().')
678    def test_post_fork_child_no_deadlock(self):
679        """Ensure child logging locks are not held; bpo-6721 & bpo-36533."""
680        class _OurHandler(logging.Handler):
681            def __init__(self):
682                super().__init__()
683                self.sub_handler = logging.StreamHandler(
684                    stream=open('/dev/null', 'wt', encoding='utf-8'))
685
686            def emit(self, record):
687                self.sub_handler.acquire()
688                try:
689                    self.sub_handler.emit(record)
690                finally:
691                    self.sub_handler.release()
692
693        self.assertEqual(len(logging._handlers), 0)
694        refed_h = _OurHandler()
695        self.addCleanup(refed_h.sub_handler.stream.close)
696        refed_h.name = 'because we need at least one for this test'
697        self.assertGreater(len(logging._handlers), 0)
698        self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1)
699        test_logger = logging.getLogger('test_post_fork_child_no_deadlock')
700        test_logger.addHandler(refed_h)
701        test_logger.setLevel(logging.DEBUG)
702
703        locks_held__ready_to_fork = threading.Event()
704        fork_happened__release_locks_and_end_thread = threading.Event()
705
706        def lock_holder_thread_fn():
707            logging._acquireLock()
708            try:
709                refed_h.acquire()
710                try:
711                    # Tell the main thread to do the fork.
712                    locks_held__ready_to_fork.set()
713
714                    # If the deadlock bug exists, the fork will happen
715                    # without dealing with the locks we hold, deadlocking
716                    # the child.
717
718                    # Wait for a successful fork or an unreasonable amount of
719                    # time before releasing our locks.  To avoid a timing based
720                    # test we'd need communication from os.fork() as to when it
721                    # has actually happened.  Given this is a regression test
722                    # for a fixed issue, potentially less reliably detecting
723                    # regression via timing is acceptable for simplicity.
724                    # The test will always take at least this long. :(
725                    fork_happened__release_locks_and_end_thread.wait(0.5)
726                finally:
727                    refed_h.release()
728            finally:
729                logging._releaseLock()
730
731        lock_holder_thread = threading.Thread(
732                target=lock_holder_thread_fn,
733                name='test_post_fork_child_no_deadlock lock holder')
734        lock_holder_thread.start()
735
736        locks_held__ready_to_fork.wait()
737        pid = os.fork()
738        if pid == 0:
739            # Child process
740            try:
741                test_logger.info(r'Child process did not deadlock. \o/')
742            finally:
743                os._exit(0)
744        else:
745            # Parent process
746            test_logger.info(r'Parent process returned from fork. \o/')
747            fork_happened__release_locks_and_end_thread.set()
748            lock_holder_thread.join()
749
750            support.wait_process(pid, exitcode=0)
751
752
753class BadStream(object):
754    def write(self, data):
755        raise RuntimeError('deliberate mistake')
756
757class TestStreamHandler(logging.StreamHandler):
758    def handleError(self, record):
759        self.error_record = record
760
761class StreamWithIntName(object):
762    level = logging.NOTSET
763    name = 2
764
765class StreamHandlerTest(BaseTest):
766    def test_error_handling(self):
767        h = TestStreamHandler(BadStream())
768        r = logging.makeLogRecord({})
769        old_raise = logging.raiseExceptions
770
771        try:
772            h.handle(r)
773            self.assertIs(h.error_record, r)
774
775            h = logging.StreamHandler(BadStream())
776            with support.captured_stderr() as stderr:
777                h.handle(r)
778                msg = '\nRuntimeError: deliberate mistake\n'
779                self.assertIn(msg, stderr.getvalue())
780
781            logging.raiseExceptions = False
782            with support.captured_stderr() as stderr:
783                h.handle(r)
784                self.assertEqual('', stderr.getvalue())
785        finally:
786            logging.raiseExceptions = old_raise
787
788    def test_stream_setting(self):
789        """
790        Test setting the handler's stream
791        """
792        h = logging.StreamHandler()
793        stream = io.StringIO()
794        old = h.setStream(stream)
795        self.assertIs(old, sys.stderr)
796        actual = h.setStream(old)
797        self.assertIs(actual, stream)
798        # test that setting to existing value returns None
799        actual = h.setStream(old)
800        self.assertIsNone(actual)
801
802    def test_can_represent_stream_with_int_name(self):
803        h = logging.StreamHandler(StreamWithIntName())
804        self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>')
805
806# -- The following section could be moved into a server_helper.py module
807# -- if it proves to be of wider utility than just test_logging
808
809class TestSMTPServer(smtpd.SMTPServer):
810    """
811    This class implements a test SMTP server.
812
813    :param addr: A (host, port) tuple which the server listens on.
814                 You can specify a port value of zero: the server's
815                 *port* attribute will hold the actual port number
816                 used, which can be used in client connections.
817    :param handler: A callable which will be called to process
818                    incoming messages. The handler will be passed
819                    the client address tuple, who the message is from,
820                    a list of recipients and the message data.
821    :param poll_interval: The interval, in seconds, used in the underlying
822                          :func:`select` or :func:`poll` call by
823                          :func:`asyncore.loop`.
824    :param sockmap: A dictionary which will be used to hold
825                    :class:`asyncore.dispatcher` instances used by
826                    :func:`asyncore.loop`. This avoids changing the
827                    :mod:`asyncore` module's global state.
828    """
829
830    def __init__(self, addr, handler, poll_interval, sockmap):
831        smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
832                                  decode_data=True)
833        self.port = self.socket.getsockname()[1]
834        self._handler = handler
835        self._thread = None
836        self._quit = False
837        self.poll_interval = poll_interval
838
839    def process_message(self, peer, mailfrom, rcpttos, data):
840        """
841        Delegates to the handler passed in to the server's constructor.
842
843        Typically, this will be a test case method.
844        :param peer: The client (host, port) tuple.
845        :param mailfrom: The address of the sender.
846        :param rcpttos: The addresses of the recipients.
847        :param data: The message.
848        """
849        self._handler(peer, mailfrom, rcpttos, data)
850
851    def start(self):
852        """
853        Start the server running on a separate daemon thread.
854        """
855        self._thread = t = threading.Thread(target=self.serve_forever,
856                                            args=(self.poll_interval,))
857        t.daemon = True
858        t.start()
859
860    def serve_forever(self, poll_interval):
861        """
862        Run the :mod:`asyncore` loop until normal termination
863        conditions arise.
864        :param poll_interval: The interval, in seconds, used in the underlying
865                              :func:`select` or :func:`poll` call by
866                              :func:`asyncore.loop`.
867        """
868        while not self._quit:
869            asyncore.loop(poll_interval, map=self._map, count=1)
870
871    def stop(self):
872        """
873        Stop the thread by closing the server instance.
874        Wait for the server thread to terminate.
875        """
876        self._quit = True
877        threading_helper.join_thread(self._thread)
878        self._thread = None
879        self.close()
880        asyncore.close_all(map=self._map, ignore_all=True)
881
882
883class ControlMixin(object):
884    """
885    This mixin is used to start a server on a separate thread, and
886    shut it down programmatically. Request handling is simplified - instead
887    of needing to derive a suitable RequestHandler subclass, you just
888    provide a callable which will be passed each received request to be
889    processed.
890
891    :param handler: A handler callable which will be called with a
892                    single parameter - the request - in order to
893                    process the request. This handler is called on the
894                    server thread, effectively meaning that requests are
895                    processed serially. While not quite web scale ;-),
896                    this should be fine for testing applications.
897    :param poll_interval: The polling interval in seconds.
898    """
899    def __init__(self, handler, poll_interval):
900        self._thread = None
901        self.poll_interval = poll_interval
902        self._handler = handler
903        self.ready = threading.Event()
904
905    def start(self):
906        """
907        Create a daemon thread to run the server, and start it.
908        """
909        self._thread = t = threading.Thread(target=self.serve_forever,
910                                            args=(self.poll_interval,))
911        t.daemon = True
912        t.start()
913
914    def serve_forever(self, poll_interval):
915        """
916        Run the server. Set the ready flag before entering the
917        service loop.
918        """
919        self.ready.set()
920        super(ControlMixin, self).serve_forever(poll_interval)
921
922    def stop(self):
923        """
924        Tell the server thread to stop, and wait for it to do so.
925        """
926        self.shutdown()
927        if self._thread is not None:
928            threading_helper.join_thread(self._thread)
929            self._thread = None
930        self.server_close()
931        self.ready.clear()
932
933class TestHTTPServer(ControlMixin, HTTPServer):
934    """
935    An HTTP server which is controllable using :class:`ControlMixin`.
936
937    :param addr: A tuple with the IP address and port to listen on.
938    :param handler: A handler callable which will be called with a
939                    single parameter - the request - in order to
940                    process the request.
941    :param poll_interval: The polling interval in seconds.
942    :param log: Pass ``True`` to enable log messages.
943    """
944    def __init__(self, addr, handler, poll_interval=0.5,
945                 log=False, sslctx=None):
946        class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
947            def __getattr__(self, name, default=None):
948                if name.startswith('do_'):
949                    return self.process_request
950                raise AttributeError(name)
951
952            def process_request(self):
953                self.server._handler(self)
954
955            def log_message(self, format, *args):
956                if log:
957                    super(DelegatingHTTPRequestHandler,
958                          self).log_message(format, *args)
959        HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
960        ControlMixin.__init__(self, handler, poll_interval)
961        self.sslctx = sslctx
962
963    def get_request(self):
964        try:
965            sock, addr = self.socket.accept()
966            if self.sslctx:
967                sock = self.sslctx.wrap_socket(sock, server_side=True)
968        except OSError as e:
969            # socket errors are silenced by the caller, print them here
970            sys.stderr.write("Got an error:\n%s\n" % e)
971            raise
972        return sock, addr
973
974class TestTCPServer(ControlMixin, ThreadingTCPServer):
975    """
976    A TCP server which is controllable using :class:`ControlMixin`.
977
978    :param addr: A tuple with the IP address and port to listen on.
979    :param handler: A handler callable which will be called with a single
980                    parameter - the request - in order to process the request.
981    :param poll_interval: The polling interval in seconds.
982    :bind_and_activate: If True (the default), binds the server and starts it
983                        listening. If False, you need to call
984                        :meth:`server_bind` and :meth:`server_activate` at
985                        some later time before calling :meth:`start`, so that
986                        the server will set up the socket and listen on it.
987    """
988
989    allow_reuse_address = True
990
991    def __init__(self, addr, handler, poll_interval=0.5,
992                 bind_and_activate=True):
993        class DelegatingTCPRequestHandler(StreamRequestHandler):
994
995            def handle(self):
996                self.server._handler(self)
997        ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
998                                    bind_and_activate)
999        ControlMixin.__init__(self, handler, poll_interval)
1000
1001    def server_bind(self):
1002        super(TestTCPServer, self).server_bind()
1003        self.port = self.socket.getsockname()[1]
1004
1005class TestUDPServer(ControlMixin, ThreadingUDPServer):
1006    """
1007    A UDP server which is controllable using :class:`ControlMixin`.
1008
1009    :param addr: A tuple with the IP address and port to listen on.
1010    :param handler: A handler callable which will be called with a
1011                    single parameter - the request - in order to
1012                    process the request.
1013    :param poll_interval: The polling interval for shutdown requests,
1014                          in seconds.
1015    :bind_and_activate: If True (the default), binds the server and
1016                        starts it listening. If False, you need to
1017                        call :meth:`server_bind` and
1018                        :meth:`server_activate` at some later time
1019                        before calling :meth:`start`, so that the server will
1020                        set up the socket and listen on it.
1021    """
1022    def __init__(self, addr, handler, poll_interval=0.5,
1023                 bind_and_activate=True):
1024        class DelegatingUDPRequestHandler(DatagramRequestHandler):
1025
1026            def handle(self):
1027                self.server._handler(self)
1028
1029            def finish(self):
1030                data = self.wfile.getvalue()
1031                if data:
1032                    try:
1033                        super(DelegatingUDPRequestHandler, self).finish()
1034                    except OSError:
1035                        if not self.server._closed:
1036                            raise
1037
1038        ThreadingUDPServer.__init__(self, addr,
1039                                    DelegatingUDPRequestHandler,
1040                                    bind_and_activate)
1041        ControlMixin.__init__(self, handler, poll_interval)
1042        self._closed = False
1043
1044    def server_bind(self):
1045        super(TestUDPServer, self).server_bind()
1046        self.port = self.socket.getsockname()[1]
1047
1048    def server_close(self):
1049        super(TestUDPServer, self).server_close()
1050        self._closed = True
1051
1052if hasattr(socket, "AF_UNIX"):
1053    class TestUnixStreamServer(TestTCPServer):
1054        address_family = socket.AF_UNIX
1055
1056    class TestUnixDatagramServer(TestUDPServer):
1057        address_family = socket.AF_UNIX
1058
1059# - end of server_helper section
1060
1061class SMTPHandlerTest(BaseTest):
1062    # bpo-14314, bpo-19665, bpo-34092: don't wait forever
1063    TIMEOUT = support.LONG_TIMEOUT
1064
1065    def test_basic(self):
1066        sockmap = {}
1067        server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001,
1068                                sockmap)
1069        server.start()
1070        addr = (socket_helper.HOST, server.port)
1071        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
1072                                         timeout=self.TIMEOUT)
1073        self.assertEqual(h.toaddrs, ['you'])
1074        self.messages = []
1075        r = logging.makeLogRecord({'msg': 'Hello \u2713'})
1076        self.handled = threading.Event()
1077        h.handle(r)
1078        self.handled.wait(self.TIMEOUT)
1079        server.stop()
1080        self.assertTrue(self.handled.is_set())
1081        self.assertEqual(len(self.messages), 1)
1082        peer, mailfrom, rcpttos, data = self.messages[0]
1083        self.assertEqual(mailfrom, 'me')
1084        self.assertEqual(rcpttos, ['you'])
1085        self.assertIn('\nSubject: Log\n', data)
1086        self.assertTrue(data.endswith('\n\nHello \u2713'))
1087        h.close()
1088
1089    def process_message(self, *args):
1090        self.messages.append(args)
1091        self.handled.set()
1092
1093class MemoryHandlerTest(BaseTest):
1094
1095    """Tests for the MemoryHandler."""
1096
1097    # Do not bother with a logger name group.
1098    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
1099
1100    def setUp(self):
1101        BaseTest.setUp(self)
1102        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1103                                                       self.root_hdlr)
1104        self.mem_logger = logging.getLogger('mem')
1105        self.mem_logger.propagate = 0
1106        self.mem_logger.addHandler(self.mem_hdlr)
1107
1108    def tearDown(self):
1109        self.mem_hdlr.close()
1110        BaseTest.tearDown(self)
1111
1112    def test_flush(self):
1113        # The memory handler flushes to its target handler based on specific
1114        #  criteria (message count and message level).
1115        self.mem_logger.debug(self.next_message())
1116        self.assert_log_lines([])
1117        self.mem_logger.info(self.next_message())
1118        self.assert_log_lines([])
1119        # This will flush because the level is >= logging.WARNING
1120        self.mem_logger.warning(self.next_message())
1121        lines = [
1122            ('DEBUG', '1'),
1123            ('INFO', '2'),
1124            ('WARNING', '3'),
1125        ]
1126        self.assert_log_lines(lines)
1127        for n in (4, 14):
1128            for i in range(9):
1129                self.mem_logger.debug(self.next_message())
1130            self.assert_log_lines(lines)
1131            # This will flush because it's the 10th message since the last
1132            #  flush.
1133            self.mem_logger.debug(self.next_message())
1134            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
1135            self.assert_log_lines(lines)
1136
1137        self.mem_logger.debug(self.next_message())
1138        self.assert_log_lines(lines)
1139
1140    def test_flush_on_close(self):
1141        """
1142        Test that the flush-on-close configuration works as expected.
1143        """
1144        self.mem_logger.debug(self.next_message())
1145        self.assert_log_lines([])
1146        self.mem_logger.info(self.next_message())
1147        self.assert_log_lines([])
1148        self.mem_logger.removeHandler(self.mem_hdlr)
1149        # Default behaviour is to flush on close. Check that it happens.
1150        self.mem_hdlr.close()
1151        lines = [
1152            ('DEBUG', '1'),
1153            ('INFO', '2'),
1154        ]
1155        self.assert_log_lines(lines)
1156        # Now configure for flushing not to be done on close.
1157        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1158                                                       self.root_hdlr,
1159                                                       False)
1160        self.mem_logger.addHandler(self.mem_hdlr)
1161        self.mem_logger.debug(self.next_message())
1162        self.assert_log_lines(lines)  # no change
1163        self.mem_logger.info(self.next_message())
1164        self.assert_log_lines(lines)  # no change
1165        self.mem_logger.removeHandler(self.mem_hdlr)
1166        self.mem_hdlr.close()
1167        # assert that no new lines have been added
1168        self.assert_log_lines(lines)  # no change
1169
1170    def test_race_between_set_target_and_flush(self):
1171        class MockRaceConditionHandler:
1172            def __init__(self, mem_hdlr):
1173                self.mem_hdlr = mem_hdlr
1174                self.threads = []
1175
1176            def removeTarget(self):
1177                self.mem_hdlr.setTarget(None)
1178
1179            def handle(self, msg):
1180                thread = threading.Thread(target=self.removeTarget)
1181                self.threads.append(thread)
1182                thread.start()
1183
1184        target = MockRaceConditionHandler(self.mem_hdlr)
1185        try:
1186            self.mem_hdlr.setTarget(target)
1187
1188            for _ in range(10):
1189                time.sleep(0.005)
1190                self.mem_logger.info("not flushed")
1191                self.mem_logger.warning("flushed")
1192        finally:
1193            for thread in target.threads:
1194                threading_helper.join_thread(thread)
1195
1196
1197class ExceptionFormatter(logging.Formatter):
1198    """A special exception formatter."""
1199    def formatException(self, ei):
1200        return "Got a [%s]" % ei[0].__name__
1201
1202
1203class ConfigFileTest(BaseTest):
1204
1205    """Reading logging config from a .ini-style config file."""
1206
1207    check_no_resource_warning = warnings_helper.check_no_resource_warning
1208    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1209
1210    # config0 is a standard configuration.
1211    config0 = """
1212    [loggers]
1213    keys=root
1214
1215    [handlers]
1216    keys=hand1
1217
1218    [formatters]
1219    keys=form1
1220
1221    [logger_root]
1222    level=WARNING
1223    handlers=hand1
1224
1225    [handler_hand1]
1226    class=StreamHandler
1227    level=NOTSET
1228    formatter=form1
1229    args=(sys.stdout,)
1230
1231    [formatter_form1]
1232    format=%(levelname)s ++ %(message)s
1233    datefmt=
1234    """
1235
1236    # config1 adds a little to the standard configuration.
1237    config1 = """
1238    [loggers]
1239    keys=root,parser
1240
1241    [handlers]
1242    keys=hand1
1243
1244    [formatters]
1245    keys=form1
1246
1247    [logger_root]
1248    level=WARNING
1249    handlers=
1250
1251    [logger_parser]
1252    level=DEBUG
1253    handlers=hand1
1254    propagate=1
1255    qualname=compiler.parser
1256
1257    [handler_hand1]
1258    class=StreamHandler
1259    level=NOTSET
1260    formatter=form1
1261    args=(sys.stdout,)
1262
1263    [formatter_form1]
1264    format=%(levelname)s ++ %(message)s
1265    datefmt=
1266    """
1267
1268    # config1a moves the handler to the root.
1269    config1a = """
1270    [loggers]
1271    keys=root,parser
1272
1273    [handlers]
1274    keys=hand1
1275
1276    [formatters]
1277    keys=form1
1278
1279    [logger_root]
1280    level=WARNING
1281    handlers=hand1
1282
1283    [logger_parser]
1284    level=DEBUG
1285    handlers=
1286    propagate=1
1287    qualname=compiler.parser
1288
1289    [handler_hand1]
1290    class=StreamHandler
1291    level=NOTSET
1292    formatter=form1
1293    args=(sys.stdout,)
1294
1295    [formatter_form1]
1296    format=%(levelname)s ++ %(message)s
1297    datefmt=
1298    """
1299
1300    # config2 has a subtle configuration error that should be reported
1301    config2 = config1.replace("sys.stdout", "sys.stbout")
1302
1303    # config3 has a less subtle configuration error
1304    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
1305
1306    # config4 specifies a custom formatter class to be loaded
1307    config4 = """
1308    [loggers]
1309    keys=root
1310
1311    [handlers]
1312    keys=hand1
1313
1314    [formatters]
1315    keys=form1
1316
1317    [logger_root]
1318    level=NOTSET
1319    handlers=hand1
1320
1321    [handler_hand1]
1322    class=StreamHandler
1323    level=NOTSET
1324    formatter=form1
1325    args=(sys.stdout,)
1326
1327    [formatter_form1]
1328    class=""" + __name__ + """.ExceptionFormatter
1329    format=%(levelname)s:%(name)s:%(message)s
1330    datefmt=
1331    """
1332
1333    # config5 specifies a custom handler class to be loaded
1334    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
1335
1336    # config6 uses ', ' delimiters in the handlers and formatters sections
1337    config6 = """
1338    [loggers]
1339    keys=root,parser
1340
1341    [handlers]
1342    keys=hand1, hand2
1343
1344    [formatters]
1345    keys=form1, form2
1346
1347    [logger_root]
1348    level=WARNING
1349    handlers=
1350
1351    [logger_parser]
1352    level=DEBUG
1353    handlers=hand1
1354    propagate=1
1355    qualname=compiler.parser
1356
1357    [handler_hand1]
1358    class=StreamHandler
1359    level=NOTSET
1360    formatter=form1
1361    args=(sys.stdout,)
1362
1363    [handler_hand2]
1364    class=StreamHandler
1365    level=NOTSET
1366    formatter=form1
1367    args=(sys.stderr,)
1368
1369    [formatter_form1]
1370    format=%(levelname)s ++ %(message)s
1371    datefmt=
1372
1373    [formatter_form2]
1374    format=%(message)s
1375    datefmt=
1376    """
1377
1378    # config7 adds a compiler logger, and uses kwargs instead of args.
1379    config7 = """
1380    [loggers]
1381    keys=root,parser,compiler
1382
1383    [handlers]
1384    keys=hand1
1385
1386    [formatters]
1387    keys=form1
1388
1389    [logger_root]
1390    level=WARNING
1391    handlers=hand1
1392
1393    [logger_compiler]
1394    level=DEBUG
1395    handlers=
1396    propagate=1
1397    qualname=compiler
1398
1399    [logger_parser]
1400    level=DEBUG
1401    handlers=
1402    propagate=1
1403    qualname=compiler.parser
1404
1405    [handler_hand1]
1406    class=StreamHandler
1407    level=NOTSET
1408    formatter=form1
1409    kwargs={'stream': sys.stdout,}
1410
1411    [formatter_form1]
1412    format=%(levelname)s ++ %(message)s
1413    datefmt=
1414    """
1415
1416    # config 8, check for resource warning
1417    config8 = r"""
1418    [loggers]
1419    keys=root
1420
1421    [handlers]
1422    keys=file
1423
1424    [formatters]
1425    keys=
1426
1427    [logger_root]
1428    level=DEBUG
1429    handlers=file
1430
1431    [handler_file]
1432    class=FileHandler
1433    level=DEBUG
1434    args=("{tempfile}",)
1435    kwargs={{"encoding": "utf-8"}}
1436    """
1437
1438    disable_test = """
1439    [loggers]
1440    keys=root
1441
1442    [handlers]
1443    keys=screen
1444
1445    [formatters]
1446    keys=
1447
1448    [logger_root]
1449    level=DEBUG
1450    handlers=screen
1451
1452    [handler_screen]
1453    level=DEBUG
1454    class=StreamHandler
1455    args=(sys.stdout,)
1456    formatter=
1457    """
1458
1459    def apply_config(self, conf, **kwargs):
1460        file = io.StringIO(textwrap.dedent(conf))
1461        logging.config.fileConfig(file, encoding="utf-8", **kwargs)
1462
1463    def test_config0_ok(self):
1464        # A simple config file which overrides the default settings.
1465        with support.captured_stdout() as output:
1466            self.apply_config(self.config0)
1467            logger = logging.getLogger()
1468            # Won't output anything
1469            logger.info(self.next_message())
1470            # Outputs a message
1471            logger.error(self.next_message())
1472            self.assert_log_lines([
1473                ('ERROR', '2'),
1474            ], stream=output)
1475            # Original logger output is empty.
1476            self.assert_log_lines([])
1477
1478    def test_config0_using_cp_ok(self):
1479        # A simple config file which overrides the default settings.
1480        with support.captured_stdout() as output:
1481            file = io.StringIO(textwrap.dedent(self.config0))
1482            cp = configparser.ConfigParser()
1483            cp.read_file(file)
1484            logging.config.fileConfig(cp)
1485            logger = logging.getLogger()
1486            # Won't output anything
1487            logger.info(self.next_message())
1488            # Outputs a message
1489            logger.error(self.next_message())
1490            self.assert_log_lines([
1491                ('ERROR', '2'),
1492            ], stream=output)
1493            # Original logger output is empty.
1494            self.assert_log_lines([])
1495
1496    def test_config1_ok(self, config=config1):
1497        # A config file defining a sub-parser as well.
1498        with support.captured_stdout() as output:
1499            self.apply_config(config)
1500            logger = logging.getLogger("compiler.parser")
1501            # Both will output a message
1502            logger.info(self.next_message())
1503            logger.error(self.next_message())
1504            self.assert_log_lines([
1505                ('INFO', '1'),
1506                ('ERROR', '2'),
1507            ], stream=output)
1508            # Original logger output is empty.
1509            self.assert_log_lines([])
1510
1511    def test_config2_failure(self):
1512        # A simple config file which overrides the default settings.
1513        self.assertRaises(Exception, self.apply_config, self.config2)
1514
1515    def test_config3_failure(self):
1516        # A simple config file which overrides the default settings.
1517        self.assertRaises(Exception, self.apply_config, self.config3)
1518
1519    def test_config4_ok(self):
1520        # A config file specifying a custom formatter class.
1521        with support.captured_stdout() as output:
1522            self.apply_config(self.config4)
1523            logger = logging.getLogger()
1524            try:
1525                raise RuntimeError()
1526            except RuntimeError:
1527                logging.exception("just testing")
1528            sys.stdout.seek(0)
1529            self.assertEqual(output.getvalue(),
1530                "ERROR:root:just testing\nGot a [RuntimeError]\n")
1531            # Original logger output is empty
1532            self.assert_log_lines([])
1533
1534    def test_config5_ok(self):
1535        self.test_config1_ok(config=self.config5)
1536
1537    def test_config6_ok(self):
1538        self.test_config1_ok(config=self.config6)
1539
1540    def test_config7_ok(self):
1541        with support.captured_stdout() as output:
1542            self.apply_config(self.config1a)
1543            logger = logging.getLogger("compiler.parser")
1544            # See issue #11424. compiler-hyphenated sorts
1545            # between compiler and compiler.xyz and this
1546            # was preventing compiler.xyz from being included
1547            # in the child loggers of compiler because of an
1548            # overzealous loop termination condition.
1549            hyphenated = logging.getLogger('compiler-hyphenated')
1550            # All will output a message
1551            logger.info(self.next_message())
1552            logger.error(self.next_message())
1553            hyphenated.critical(self.next_message())
1554            self.assert_log_lines([
1555                ('INFO', '1'),
1556                ('ERROR', '2'),
1557                ('CRITICAL', '3'),
1558            ], stream=output)
1559            # Original logger output is empty.
1560            self.assert_log_lines([])
1561        with support.captured_stdout() as output:
1562            self.apply_config(self.config7)
1563            logger = logging.getLogger("compiler.parser")
1564            self.assertFalse(logger.disabled)
1565            # Both will output a message
1566            logger.info(self.next_message())
1567            logger.error(self.next_message())
1568            logger = logging.getLogger("compiler.lexer")
1569            # Both will output a message
1570            logger.info(self.next_message())
1571            logger.error(self.next_message())
1572            # Will not appear
1573            hyphenated.critical(self.next_message())
1574            self.assert_log_lines([
1575                ('INFO', '4'),
1576                ('ERROR', '5'),
1577                ('INFO', '6'),
1578                ('ERROR', '7'),
1579            ], stream=output)
1580            # Original logger output is empty.
1581            self.assert_log_lines([])
1582
1583    def test_config8_ok(self):
1584
1585        def cleanup(h1, fn):
1586            h1.close()
1587            os.remove(fn)
1588
1589        with self.check_no_resource_warning():
1590            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
1591            os.close(fd)
1592
1593            # Replace single backslash with double backslash in windows
1594            # to avoid unicode error during string formatting
1595            if os.name == "nt":
1596                fn = fn.replace("\\", "\\\\")
1597
1598            config8 = self.config8.format(tempfile=fn)
1599
1600            self.apply_config(config8)
1601            self.apply_config(config8)
1602
1603        handler = logging.root.handlers[0]
1604        self.addCleanup(cleanup, handler, fn)
1605
1606    def test_logger_disabling(self):
1607        self.apply_config(self.disable_test)
1608        logger = logging.getLogger('some_pristine_logger')
1609        self.assertFalse(logger.disabled)
1610        self.apply_config(self.disable_test)
1611        self.assertTrue(logger.disabled)
1612        self.apply_config(self.disable_test, disable_existing_loggers=False)
1613        self.assertFalse(logger.disabled)
1614
1615    def test_config_set_handler_names(self):
1616        test_config = """
1617            [loggers]
1618            keys=root
1619
1620            [handlers]
1621            keys=hand1
1622
1623            [formatters]
1624            keys=form1
1625
1626            [logger_root]
1627            handlers=hand1
1628
1629            [handler_hand1]
1630            class=StreamHandler
1631            formatter=form1
1632
1633            [formatter_form1]
1634            format=%(levelname)s ++ %(message)s
1635            """
1636        self.apply_config(test_config)
1637        self.assertEqual(logging.getLogger().handlers[0].name, 'hand1')
1638
1639    def test_defaults_do_no_interpolation(self):
1640        """bpo-33802 defaults should not get interpolated"""
1641        ini = textwrap.dedent("""
1642            [formatters]
1643            keys=default
1644
1645            [formatter_default]
1646
1647            [handlers]
1648            keys=console
1649
1650            [handler_console]
1651            class=logging.StreamHandler
1652            args=tuple()
1653
1654            [loggers]
1655            keys=root
1656
1657            [logger_root]
1658            formatter=default
1659            handlers=console
1660            """).strip()
1661        fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini')
1662        try:
1663            os.write(fd, ini.encode('ascii'))
1664            os.close(fd)
1665            logging.config.fileConfig(
1666                fn,
1667                encoding="utf-8",
1668                defaults=dict(
1669                    version=1,
1670                    disable_existing_loggers=False,
1671                    formatters={
1672                        "generic": {
1673                            "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s",
1674                            "datefmt": "[%Y-%m-%d %H:%M:%S %z]",
1675                            "class": "logging.Formatter"
1676                        },
1677                    },
1678                )
1679            )
1680        finally:
1681            os.unlink(fn)
1682
1683
1684class SocketHandlerTest(BaseTest):
1685
1686    """Test for SocketHandler objects."""
1687
1688    server_class = TestTCPServer
1689    address = ('localhost', 0)
1690
1691    def setUp(self):
1692        """Set up a TCP server to receive log messages, and a SocketHandler
1693        pointing to that server's address and port."""
1694        BaseTest.setUp(self)
1695        # Issue #29177: deal with errors that happen during setup
1696        self.server = self.sock_hdlr = self.server_exception = None
1697        try:
1698            self.server = server = self.server_class(self.address,
1699                                                     self.handle_socket, 0.01)
1700            server.start()
1701            # Uncomment next line to test error recovery in setUp()
1702            # raise OSError('dummy error raised')
1703        except OSError as e:
1704            self.server_exception = e
1705            return
1706        server.ready.wait()
1707        hcls = logging.handlers.SocketHandler
1708        if isinstance(server.server_address, tuple):
1709            self.sock_hdlr = hcls('localhost', server.port)
1710        else:
1711            self.sock_hdlr = hcls(server.server_address, None)
1712        self.log_output = ''
1713        self.root_logger.removeHandler(self.root_logger.handlers[0])
1714        self.root_logger.addHandler(self.sock_hdlr)
1715        self.handled = threading.Semaphore(0)
1716
1717    def tearDown(self):
1718        """Shutdown the TCP server."""
1719        try:
1720            if self.sock_hdlr:
1721                self.root_logger.removeHandler(self.sock_hdlr)
1722                self.sock_hdlr.close()
1723            if self.server:
1724                self.server.stop()
1725        finally:
1726            BaseTest.tearDown(self)
1727
1728    def handle_socket(self, request):
1729        conn = request.connection
1730        while True:
1731            chunk = conn.recv(4)
1732            if len(chunk) < 4:
1733                break
1734            slen = struct.unpack(">L", chunk)[0]
1735            chunk = conn.recv(slen)
1736            while len(chunk) < slen:
1737                chunk = chunk + conn.recv(slen - len(chunk))
1738            obj = pickle.loads(chunk)
1739            record = logging.makeLogRecord(obj)
1740            self.log_output += record.msg + '\n'
1741            self.handled.release()
1742
1743    def test_output(self):
1744        # The log message sent to the SocketHandler is properly received.
1745        if self.server_exception:
1746            self.skipTest(self.server_exception)
1747        logger = logging.getLogger("tcp")
1748        logger.error("spam")
1749        self.handled.acquire()
1750        logger.debug("eggs")
1751        self.handled.acquire()
1752        self.assertEqual(self.log_output, "spam\neggs\n")
1753
1754    def test_noserver(self):
1755        if self.server_exception:
1756            self.skipTest(self.server_exception)
1757        # Avoid timing-related failures due to SocketHandler's own hard-wired
1758        # one-second timeout on socket.create_connection() (issue #16264).
1759        self.sock_hdlr.retryStart = 2.5
1760        # Kill the server
1761        self.server.stop()
1762        # The logging call should try to connect, which should fail
1763        try:
1764            raise RuntimeError('Deliberate mistake')
1765        except RuntimeError:
1766            self.root_logger.exception('Never sent')
1767        self.root_logger.error('Never sent, either')
1768        now = time.time()
1769        self.assertGreater(self.sock_hdlr.retryTime, now)
1770        time.sleep(self.sock_hdlr.retryTime - now + 0.001)
1771        self.root_logger.error('Nor this')
1772
1773def _get_temp_domain_socket():
1774    fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock')
1775    os.close(fd)
1776    # just need a name - file can't be present, or we'll get an
1777    # 'address already in use' error.
1778    os.remove(fn)
1779    return fn
1780
1781@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1782class UnixSocketHandlerTest(SocketHandlerTest):
1783
1784    """Test for SocketHandler with unix sockets."""
1785
1786    if hasattr(socket, "AF_UNIX"):
1787        server_class = TestUnixStreamServer
1788
1789    def setUp(self):
1790        # override the definition in the base class
1791        self.address = _get_temp_domain_socket()
1792        SocketHandlerTest.setUp(self)
1793
1794    def tearDown(self):
1795        SocketHandlerTest.tearDown(self)
1796        os_helper.unlink(self.address)
1797
1798class DatagramHandlerTest(BaseTest):
1799
1800    """Test for DatagramHandler."""
1801
1802    server_class = TestUDPServer
1803    address = ('localhost', 0)
1804
1805    def setUp(self):
1806        """Set up a UDP server to receive log messages, and a DatagramHandler
1807        pointing to that server's address and port."""
1808        BaseTest.setUp(self)
1809        # Issue #29177: deal with errors that happen during setup
1810        self.server = self.sock_hdlr = self.server_exception = None
1811        try:
1812            self.server = server = self.server_class(self.address,
1813                                                     self.handle_datagram, 0.01)
1814            server.start()
1815            # Uncomment next line to test error recovery in setUp()
1816            # raise OSError('dummy error raised')
1817        except OSError as e:
1818            self.server_exception = e
1819            return
1820        server.ready.wait()
1821        hcls = logging.handlers.DatagramHandler
1822        if isinstance(server.server_address, tuple):
1823            self.sock_hdlr = hcls('localhost', server.port)
1824        else:
1825            self.sock_hdlr = hcls(server.server_address, None)
1826        self.log_output = ''
1827        self.root_logger.removeHandler(self.root_logger.handlers[0])
1828        self.root_logger.addHandler(self.sock_hdlr)
1829        self.handled = threading.Event()
1830
1831    def tearDown(self):
1832        """Shutdown the UDP server."""
1833        try:
1834            if self.server:
1835                self.server.stop()
1836            if self.sock_hdlr:
1837                self.root_logger.removeHandler(self.sock_hdlr)
1838                self.sock_hdlr.close()
1839        finally:
1840            BaseTest.tearDown(self)
1841
1842    def handle_datagram(self, request):
1843        slen = struct.pack('>L', 0) # length of prefix
1844        packet = request.packet[len(slen):]
1845        obj = pickle.loads(packet)
1846        record = logging.makeLogRecord(obj)
1847        self.log_output += record.msg + '\n'
1848        self.handled.set()
1849
1850    def test_output(self):
1851        # The log message sent to the DatagramHandler is properly received.
1852        if self.server_exception:
1853            self.skipTest(self.server_exception)
1854        logger = logging.getLogger("udp")
1855        logger.error("spam")
1856        self.handled.wait()
1857        self.handled.clear()
1858        logger.error("eggs")
1859        self.handled.wait()
1860        self.assertEqual(self.log_output, "spam\neggs\n")
1861
1862@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1863class UnixDatagramHandlerTest(DatagramHandlerTest):
1864
1865    """Test for DatagramHandler using Unix sockets."""
1866
1867    if hasattr(socket, "AF_UNIX"):
1868        server_class = TestUnixDatagramServer
1869
1870    def setUp(self):
1871        # override the definition in the base class
1872        self.address = _get_temp_domain_socket()
1873        DatagramHandlerTest.setUp(self)
1874
1875    def tearDown(self):
1876        DatagramHandlerTest.tearDown(self)
1877        os_helper.unlink(self.address)
1878
1879class SysLogHandlerTest(BaseTest):
1880
1881    """Test for SysLogHandler using UDP."""
1882
1883    server_class = TestUDPServer
1884    address = ('localhost', 0)
1885
1886    def setUp(self):
1887        """Set up a UDP server to receive log messages, and a SysLogHandler
1888        pointing to that server's address and port."""
1889        BaseTest.setUp(self)
1890        # Issue #29177: deal with errors that happen during setup
1891        self.server = self.sl_hdlr = self.server_exception = None
1892        try:
1893            self.server = server = self.server_class(self.address,
1894                                                     self.handle_datagram, 0.01)
1895            server.start()
1896            # Uncomment next line to test error recovery in setUp()
1897            # raise OSError('dummy error raised')
1898        except OSError as e:
1899            self.server_exception = e
1900            return
1901        server.ready.wait()
1902        hcls = logging.handlers.SysLogHandler
1903        if isinstance(server.server_address, tuple):
1904            self.sl_hdlr = hcls((server.server_address[0], server.port))
1905        else:
1906            self.sl_hdlr = hcls(server.server_address)
1907        self.log_output = ''
1908        self.root_logger.removeHandler(self.root_logger.handlers[0])
1909        self.root_logger.addHandler(self.sl_hdlr)
1910        self.handled = threading.Event()
1911
1912    def tearDown(self):
1913        """Shutdown the server."""
1914        try:
1915            if self.server:
1916                self.server.stop()
1917            if self.sl_hdlr:
1918                self.root_logger.removeHandler(self.sl_hdlr)
1919                self.sl_hdlr.close()
1920        finally:
1921            BaseTest.tearDown(self)
1922
1923    def handle_datagram(self, request):
1924        self.log_output = request.packet
1925        self.handled.set()
1926
1927    def test_output(self):
1928        if self.server_exception:
1929            self.skipTest(self.server_exception)
1930        # The log message sent to the SysLogHandler is properly received.
1931        logger = logging.getLogger("slh")
1932        logger.error("sp\xe4m")
1933        self.handled.wait()
1934        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
1935        self.handled.clear()
1936        self.sl_hdlr.append_nul = False
1937        logger.error("sp\xe4m")
1938        self.handled.wait()
1939        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
1940        self.handled.clear()
1941        self.sl_hdlr.ident = "h\xe4m-"
1942        logger.error("sp\xe4m")
1943        self.handled.wait()
1944        self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
1945
1946@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1947class UnixSysLogHandlerTest(SysLogHandlerTest):
1948
1949    """Test for SysLogHandler with Unix sockets."""
1950
1951    if hasattr(socket, "AF_UNIX"):
1952        server_class = TestUnixDatagramServer
1953
1954    def setUp(self):
1955        # override the definition in the base class
1956        self.address = _get_temp_domain_socket()
1957        SysLogHandlerTest.setUp(self)
1958
1959    def tearDown(self):
1960        SysLogHandlerTest.tearDown(self)
1961        os_helper.unlink(self.address)
1962
1963@unittest.skipUnless(socket_helper.IPV6_ENABLED,
1964                     'IPv6 support required for this test.')
1965class IPv6SysLogHandlerTest(SysLogHandlerTest):
1966
1967    """Test for SysLogHandler with IPv6 host."""
1968
1969    server_class = TestUDPServer
1970    address = ('::1', 0)
1971
1972    def setUp(self):
1973        self.server_class.address_family = socket.AF_INET6
1974        super(IPv6SysLogHandlerTest, self).setUp()
1975
1976    def tearDown(self):
1977        self.server_class.address_family = socket.AF_INET
1978        super(IPv6SysLogHandlerTest, self).tearDown()
1979
1980class HTTPHandlerTest(BaseTest):
1981    """Test for HTTPHandler."""
1982
1983    def setUp(self):
1984        """Set up an HTTP server to receive log messages, and a HTTPHandler
1985        pointing to that server's address and port."""
1986        BaseTest.setUp(self)
1987        self.handled = threading.Event()
1988
1989    def handle_request(self, request):
1990        self.command = request.command
1991        self.log_data = urlparse(request.path)
1992        if self.command == 'POST':
1993            try:
1994                rlen = int(request.headers['Content-Length'])
1995                self.post_data = request.rfile.read(rlen)
1996            except:
1997                self.post_data = None
1998        request.send_response(200)
1999        request.end_headers()
2000        self.handled.set()
2001
2002    def test_output(self):
2003        # The log message sent to the HTTPHandler is properly received.
2004        logger = logging.getLogger("http")
2005        root_logger = self.root_logger
2006        root_logger.removeHandler(self.root_logger.handlers[0])
2007        for secure in (False, True):
2008            addr = ('localhost', 0)
2009            if secure:
2010                try:
2011                    import ssl
2012                except ImportError:
2013                    sslctx = None
2014                else:
2015                    here = os.path.dirname(__file__)
2016                    localhost_cert = os.path.join(here, "keycert.pem")
2017                    sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2018                    sslctx.load_cert_chain(localhost_cert)
2019
2020                    context = ssl.create_default_context(cafile=localhost_cert)
2021            else:
2022                sslctx = None
2023                context = None
2024            self.server = server = TestHTTPServer(addr, self.handle_request,
2025                                                    0.01, sslctx=sslctx)
2026            server.start()
2027            server.ready.wait()
2028            host = 'localhost:%d' % server.server_port
2029            secure_client = secure and sslctx
2030            self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
2031                                                       secure=secure_client,
2032                                                       context=context,
2033                                                       credentials=('foo', 'bar'))
2034            self.log_data = None
2035            root_logger.addHandler(self.h_hdlr)
2036
2037            for method in ('GET', 'POST'):
2038                self.h_hdlr.method = method
2039                self.handled.clear()
2040                msg = "sp\xe4m"
2041                logger.error(msg)
2042                self.handled.wait()
2043                self.assertEqual(self.log_data.path, '/frob')
2044                self.assertEqual(self.command, method)
2045                if method == 'GET':
2046                    d = parse_qs(self.log_data.query)
2047                else:
2048                    d = parse_qs(self.post_data.decode('utf-8'))
2049                self.assertEqual(d['name'], ['http'])
2050                self.assertEqual(d['funcName'], ['test_output'])
2051                self.assertEqual(d['msg'], [msg])
2052
2053            self.server.stop()
2054            self.root_logger.removeHandler(self.h_hdlr)
2055            self.h_hdlr.close()
2056
2057class MemoryTest(BaseTest):
2058
2059    """Test memory persistence of logger objects."""
2060
2061    def setUp(self):
2062        """Create a dict to remember potentially destroyed objects."""
2063        BaseTest.setUp(self)
2064        self._survivors = {}
2065
2066    def _watch_for_survival(self, *args):
2067        """Watch the given objects for survival, by creating weakrefs to
2068        them."""
2069        for obj in args:
2070            key = id(obj), repr(obj)
2071            self._survivors[key] = weakref.ref(obj)
2072
2073    def _assertTruesurvival(self):
2074        """Assert that all objects watched for survival have survived."""
2075        # Trigger cycle breaking.
2076        gc.collect()
2077        dead = []
2078        for (id_, repr_), ref in self._survivors.items():
2079            if ref() is None:
2080                dead.append(repr_)
2081        if dead:
2082            self.fail("%d objects should have survived "
2083                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
2084
2085    def test_persistent_loggers(self):
2086        # Logger objects are persistent and retain their configuration, even
2087        #  if visible references are destroyed.
2088        self.root_logger.setLevel(logging.INFO)
2089        foo = logging.getLogger("foo")
2090        self._watch_for_survival(foo)
2091        foo.setLevel(logging.DEBUG)
2092        self.root_logger.debug(self.next_message())
2093        foo.debug(self.next_message())
2094        self.assert_log_lines([
2095            ('foo', 'DEBUG', '2'),
2096        ])
2097        del foo
2098        # foo has survived.
2099        self._assertTruesurvival()
2100        # foo has retained its settings.
2101        bar = logging.getLogger("foo")
2102        bar.debug(self.next_message())
2103        self.assert_log_lines([
2104            ('foo', 'DEBUG', '2'),
2105            ('foo', 'DEBUG', '3'),
2106        ])
2107
2108
2109class EncodingTest(BaseTest):
2110    def test_encoding_plain_file(self):
2111        # In Python 2.x, a plain file object is treated as having no encoding.
2112        log = logging.getLogger("test")
2113        fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
2114        os.close(fd)
2115        # the non-ascii data we write to the log.
2116        data = "foo\x80"
2117        try:
2118            handler = logging.FileHandler(fn, encoding="utf-8")
2119            log.addHandler(handler)
2120            try:
2121                # write non-ascii data to the log.
2122                log.warning(data)
2123            finally:
2124                log.removeHandler(handler)
2125                handler.close()
2126            # check we wrote exactly those bytes, ignoring trailing \n etc
2127            f = open(fn, encoding="utf-8")
2128            try:
2129                self.assertEqual(f.read().rstrip(), data)
2130            finally:
2131                f.close()
2132        finally:
2133            if os.path.isfile(fn):
2134                os.remove(fn)
2135
2136    def test_encoding_cyrillic_unicode(self):
2137        log = logging.getLogger("test")
2138        # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
2139        message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
2140        # Ensure it's written in a Cyrillic encoding
2141        writer_class = codecs.getwriter('cp1251')
2142        writer_class.encoding = 'cp1251'
2143        stream = io.BytesIO()
2144        writer = writer_class(stream, 'strict')
2145        handler = logging.StreamHandler(writer)
2146        log.addHandler(handler)
2147        try:
2148            log.warning(message)
2149        finally:
2150            log.removeHandler(handler)
2151            handler.close()
2152        # check we wrote exactly those bytes, ignoring trailing \n etc
2153        s = stream.getvalue()
2154        # Compare against what the data should be when encoded in CP-1251
2155        self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
2156
2157
2158class WarningsTest(BaseTest):
2159
2160    def test_warnings(self):
2161        with warnings.catch_warnings():
2162            logging.captureWarnings(True)
2163            self.addCleanup(logging.captureWarnings, False)
2164            warnings.filterwarnings("always", category=UserWarning)
2165            stream = io.StringIO()
2166            h = logging.StreamHandler(stream)
2167            logger = logging.getLogger("py.warnings")
2168            logger.addHandler(h)
2169            warnings.warn("I'm warning you...")
2170            logger.removeHandler(h)
2171            s = stream.getvalue()
2172            h.close()
2173            self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
2174
2175            # See if an explicit file uses the original implementation
2176            a_file = io.StringIO()
2177            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
2178                                 a_file, "Dummy line")
2179            s = a_file.getvalue()
2180            a_file.close()
2181            self.assertEqual(s,
2182                "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
2183
2184    def test_warnings_no_handlers(self):
2185        with warnings.catch_warnings():
2186            logging.captureWarnings(True)
2187            self.addCleanup(logging.captureWarnings, False)
2188
2189            # confirm our assumption: no loggers are set
2190            logger = logging.getLogger("py.warnings")
2191            self.assertEqual(logger.handlers, [])
2192
2193            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
2194            self.assertEqual(len(logger.handlers), 1)
2195            self.assertIsInstance(logger.handlers[0], logging.NullHandler)
2196
2197
2198def formatFunc(format, datefmt=None):
2199    return logging.Formatter(format, datefmt)
2200
2201class myCustomFormatter:
2202    def __init__(self, fmt, datefmt=None):
2203        pass
2204
2205def handlerFunc():
2206    return logging.StreamHandler()
2207
2208class CustomHandler(logging.StreamHandler):
2209    pass
2210
2211class ConfigDictTest(BaseTest):
2212
2213    """Reading logging config from a dictionary."""
2214
2215    check_no_resource_warning = warnings_helper.check_no_resource_warning
2216    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
2217
2218    # config0 is a standard configuration.
2219    config0 = {
2220        'version': 1,
2221        'formatters': {
2222            'form1' : {
2223                'format' : '%(levelname)s ++ %(message)s',
2224            },
2225        },
2226        'handlers' : {
2227            'hand1' : {
2228                'class' : 'logging.StreamHandler',
2229                'formatter' : 'form1',
2230                'level' : 'NOTSET',
2231                'stream'  : 'ext://sys.stdout',
2232            },
2233        },
2234        'root' : {
2235            'level' : 'WARNING',
2236            'handlers' : ['hand1'],
2237        },
2238    }
2239
2240    # config1 adds a little to the standard configuration.
2241    config1 = {
2242        'version': 1,
2243        'formatters': {
2244            'form1' : {
2245                'format' : '%(levelname)s ++ %(message)s',
2246            },
2247        },
2248        'handlers' : {
2249            'hand1' : {
2250                'class' : 'logging.StreamHandler',
2251                'formatter' : 'form1',
2252                'level' : 'NOTSET',
2253                'stream'  : 'ext://sys.stdout',
2254            },
2255        },
2256        'loggers' : {
2257            'compiler.parser' : {
2258                'level' : 'DEBUG',
2259                'handlers' : ['hand1'],
2260            },
2261        },
2262        'root' : {
2263            'level' : 'WARNING',
2264        },
2265    }
2266
2267    # config1a moves the handler to the root. Used with config8a
2268    config1a = {
2269        'version': 1,
2270        'formatters': {
2271            'form1' : {
2272                'format' : '%(levelname)s ++ %(message)s',
2273            },
2274        },
2275        'handlers' : {
2276            'hand1' : {
2277                'class' : 'logging.StreamHandler',
2278                'formatter' : 'form1',
2279                'level' : 'NOTSET',
2280                'stream'  : 'ext://sys.stdout',
2281            },
2282        },
2283        'loggers' : {
2284            'compiler.parser' : {
2285                'level' : 'DEBUG',
2286            },
2287        },
2288        'root' : {
2289            'level' : 'WARNING',
2290            'handlers' : ['hand1'],
2291        },
2292    }
2293
2294    # config2 has a subtle configuration error that should be reported
2295    config2 = {
2296        'version': 1,
2297        'formatters': {
2298            'form1' : {
2299                'format' : '%(levelname)s ++ %(message)s',
2300            },
2301        },
2302        'handlers' : {
2303            'hand1' : {
2304                'class' : 'logging.StreamHandler',
2305                'formatter' : 'form1',
2306                'level' : 'NOTSET',
2307                'stream'  : 'ext://sys.stdbout',
2308            },
2309        },
2310        'loggers' : {
2311            'compiler.parser' : {
2312                'level' : 'DEBUG',
2313                'handlers' : ['hand1'],
2314            },
2315        },
2316        'root' : {
2317            'level' : 'WARNING',
2318        },
2319    }
2320
2321    # As config1 but with a misspelt level on a handler
2322    config2a = {
2323        'version': 1,
2324        'formatters': {
2325            'form1' : {
2326                'format' : '%(levelname)s ++ %(message)s',
2327            },
2328        },
2329        'handlers' : {
2330            'hand1' : {
2331                'class' : 'logging.StreamHandler',
2332                'formatter' : 'form1',
2333                'level' : 'NTOSET',
2334                'stream'  : 'ext://sys.stdout',
2335            },
2336        },
2337        'loggers' : {
2338            'compiler.parser' : {
2339                'level' : 'DEBUG',
2340                'handlers' : ['hand1'],
2341            },
2342        },
2343        'root' : {
2344            'level' : 'WARNING',
2345        },
2346    }
2347
2348
2349    # As config1 but with a misspelt level on a logger
2350    config2b = {
2351        'version': 1,
2352        'formatters': {
2353            'form1' : {
2354                'format' : '%(levelname)s ++ %(message)s',
2355            },
2356        },
2357        'handlers' : {
2358            'hand1' : {
2359                'class' : 'logging.StreamHandler',
2360                'formatter' : 'form1',
2361                'level' : 'NOTSET',
2362                'stream'  : 'ext://sys.stdout',
2363            },
2364        },
2365        'loggers' : {
2366            'compiler.parser' : {
2367                'level' : 'DEBUG',
2368                'handlers' : ['hand1'],
2369            },
2370        },
2371        'root' : {
2372            'level' : 'WRANING',
2373        },
2374    }
2375
2376    # config3 has a less subtle configuration error
2377    config3 = {
2378        'version': 1,
2379        'formatters': {
2380            'form1' : {
2381                'format' : '%(levelname)s ++ %(message)s',
2382            },
2383        },
2384        'handlers' : {
2385            'hand1' : {
2386                'class' : 'logging.StreamHandler',
2387                'formatter' : 'misspelled_name',
2388                'level' : 'NOTSET',
2389                'stream'  : 'ext://sys.stdout',
2390            },
2391        },
2392        'loggers' : {
2393            'compiler.parser' : {
2394                'level' : 'DEBUG',
2395                'handlers' : ['hand1'],
2396            },
2397        },
2398        'root' : {
2399            'level' : 'WARNING',
2400        },
2401    }
2402
2403    # config4 specifies a custom formatter class to be loaded
2404    config4 = {
2405        'version': 1,
2406        'formatters': {
2407            'form1' : {
2408                '()' : __name__ + '.ExceptionFormatter',
2409                'format' : '%(levelname)s:%(name)s:%(message)s',
2410            },
2411        },
2412        'handlers' : {
2413            'hand1' : {
2414                'class' : 'logging.StreamHandler',
2415                'formatter' : 'form1',
2416                'level' : 'NOTSET',
2417                'stream'  : 'ext://sys.stdout',
2418            },
2419        },
2420        'root' : {
2421            'level' : 'NOTSET',
2422                'handlers' : ['hand1'],
2423        },
2424    }
2425
2426    # As config4 but using an actual callable rather than a string
2427    config4a = {
2428        'version': 1,
2429        'formatters': {
2430            'form1' : {
2431                '()' : ExceptionFormatter,
2432                'format' : '%(levelname)s:%(name)s:%(message)s',
2433            },
2434            'form2' : {
2435                '()' : __name__ + '.formatFunc',
2436                'format' : '%(levelname)s:%(name)s:%(message)s',
2437            },
2438            'form3' : {
2439                '()' : formatFunc,
2440                'format' : '%(levelname)s:%(name)s:%(message)s',
2441            },
2442        },
2443        'handlers' : {
2444            'hand1' : {
2445                'class' : 'logging.StreamHandler',
2446                'formatter' : 'form1',
2447                'level' : 'NOTSET',
2448                'stream'  : 'ext://sys.stdout',
2449            },
2450            'hand2' : {
2451                '()' : handlerFunc,
2452            },
2453        },
2454        'root' : {
2455            'level' : 'NOTSET',
2456                'handlers' : ['hand1'],
2457        },
2458    }
2459
2460    # config5 specifies a custom handler class to be loaded
2461    config5 = {
2462        'version': 1,
2463        'formatters': {
2464            'form1' : {
2465                'format' : '%(levelname)s ++ %(message)s',
2466            },
2467        },
2468        'handlers' : {
2469            'hand1' : {
2470                'class' : __name__ + '.CustomHandler',
2471                'formatter' : 'form1',
2472                'level' : 'NOTSET',
2473                'stream'  : 'ext://sys.stdout',
2474            },
2475        },
2476        'loggers' : {
2477            'compiler.parser' : {
2478                'level' : 'DEBUG',
2479                'handlers' : ['hand1'],
2480            },
2481        },
2482        'root' : {
2483            'level' : 'WARNING',
2484        },
2485    }
2486
2487    # config6 specifies a custom handler class to be loaded
2488    # but has bad arguments
2489    config6 = {
2490        'version': 1,
2491        'formatters': {
2492            'form1' : {
2493                'format' : '%(levelname)s ++ %(message)s',
2494            },
2495        },
2496        'handlers' : {
2497            'hand1' : {
2498                'class' : __name__ + '.CustomHandler',
2499                'formatter' : 'form1',
2500                'level' : 'NOTSET',
2501                'stream'  : 'ext://sys.stdout',
2502                '9' : 'invalid parameter name',
2503            },
2504        },
2505        'loggers' : {
2506            'compiler.parser' : {
2507                'level' : 'DEBUG',
2508                'handlers' : ['hand1'],
2509            },
2510        },
2511        'root' : {
2512            'level' : 'WARNING',
2513        },
2514    }
2515
2516    # config 7 does not define compiler.parser but defines compiler.lexer
2517    # so compiler.parser should be disabled after applying it
2518    config7 = {
2519        'version': 1,
2520        'formatters': {
2521            'form1' : {
2522                'format' : '%(levelname)s ++ %(message)s',
2523            },
2524        },
2525        'handlers' : {
2526            'hand1' : {
2527                'class' : 'logging.StreamHandler',
2528                'formatter' : 'form1',
2529                'level' : 'NOTSET',
2530                'stream'  : 'ext://sys.stdout',
2531            },
2532        },
2533        'loggers' : {
2534            'compiler.lexer' : {
2535                'level' : 'DEBUG',
2536                'handlers' : ['hand1'],
2537            },
2538        },
2539        'root' : {
2540            'level' : 'WARNING',
2541        },
2542    }
2543
2544    # config8 defines both compiler and compiler.lexer
2545    # so compiler.parser should not be disabled (since
2546    # compiler is defined)
2547    config8 = {
2548        'version': 1,
2549        'disable_existing_loggers' : False,
2550        'formatters': {
2551            'form1' : {
2552                'format' : '%(levelname)s ++ %(message)s',
2553            },
2554        },
2555        'handlers' : {
2556            'hand1' : {
2557                'class' : 'logging.StreamHandler',
2558                'formatter' : 'form1',
2559                'level' : 'NOTSET',
2560                'stream'  : 'ext://sys.stdout',
2561            },
2562        },
2563        'loggers' : {
2564            'compiler' : {
2565                'level' : 'DEBUG',
2566                'handlers' : ['hand1'],
2567            },
2568            'compiler.lexer' : {
2569            },
2570        },
2571        'root' : {
2572            'level' : 'WARNING',
2573        },
2574    }
2575
2576    # config8a disables existing loggers
2577    config8a = {
2578        'version': 1,
2579        'disable_existing_loggers' : True,
2580        'formatters': {
2581            'form1' : {
2582                'format' : '%(levelname)s ++ %(message)s',
2583            },
2584        },
2585        'handlers' : {
2586            'hand1' : {
2587                'class' : 'logging.StreamHandler',
2588                'formatter' : 'form1',
2589                'level' : 'NOTSET',
2590                'stream'  : 'ext://sys.stdout',
2591            },
2592        },
2593        'loggers' : {
2594            'compiler' : {
2595                'level' : 'DEBUG',
2596                'handlers' : ['hand1'],
2597            },
2598            'compiler.lexer' : {
2599            },
2600        },
2601        'root' : {
2602            'level' : 'WARNING',
2603        },
2604    }
2605
2606    config9 = {
2607        'version': 1,
2608        'formatters': {
2609            'form1' : {
2610                'format' : '%(levelname)s ++ %(message)s',
2611            },
2612        },
2613        'handlers' : {
2614            'hand1' : {
2615                'class' : 'logging.StreamHandler',
2616                'formatter' : 'form1',
2617                'level' : 'WARNING',
2618                'stream'  : 'ext://sys.stdout',
2619            },
2620        },
2621        'loggers' : {
2622            'compiler.parser' : {
2623                'level' : 'WARNING',
2624                'handlers' : ['hand1'],
2625            },
2626        },
2627        'root' : {
2628            'level' : 'NOTSET',
2629        },
2630    }
2631
2632    config9a = {
2633        'version': 1,
2634        'incremental' : True,
2635        'handlers' : {
2636            'hand1' : {
2637                'level' : 'WARNING',
2638            },
2639        },
2640        'loggers' : {
2641            'compiler.parser' : {
2642                'level' : 'INFO',
2643            },
2644        },
2645    }
2646
2647    config9b = {
2648        'version': 1,
2649        'incremental' : True,
2650        'handlers' : {
2651            'hand1' : {
2652                'level' : 'INFO',
2653            },
2654        },
2655        'loggers' : {
2656            'compiler.parser' : {
2657                'level' : 'INFO',
2658            },
2659        },
2660    }
2661
2662    # As config1 but with a filter added
2663    config10 = {
2664        'version': 1,
2665        'formatters': {
2666            'form1' : {
2667                'format' : '%(levelname)s ++ %(message)s',
2668            },
2669        },
2670        'filters' : {
2671            'filt1' : {
2672                'name' : 'compiler.parser',
2673            },
2674        },
2675        'handlers' : {
2676            'hand1' : {
2677                'class' : 'logging.StreamHandler',
2678                'formatter' : 'form1',
2679                'level' : 'NOTSET',
2680                'stream'  : 'ext://sys.stdout',
2681                'filters' : ['filt1'],
2682            },
2683        },
2684        'loggers' : {
2685            'compiler.parser' : {
2686                'level' : 'DEBUG',
2687                'filters' : ['filt1'],
2688            },
2689        },
2690        'root' : {
2691            'level' : 'WARNING',
2692            'handlers' : ['hand1'],
2693        },
2694    }
2695
2696    # As config1 but using cfg:// references
2697    config11 = {
2698        'version': 1,
2699        'true_formatters': {
2700            'form1' : {
2701                'format' : '%(levelname)s ++ %(message)s',
2702            },
2703        },
2704        'handler_configs': {
2705            'hand1' : {
2706                'class' : 'logging.StreamHandler',
2707                'formatter' : 'form1',
2708                'level' : 'NOTSET',
2709                'stream'  : 'ext://sys.stdout',
2710            },
2711        },
2712        'formatters' : 'cfg://true_formatters',
2713        'handlers' : {
2714            'hand1' : 'cfg://handler_configs[hand1]',
2715        },
2716        'loggers' : {
2717            'compiler.parser' : {
2718                'level' : 'DEBUG',
2719                'handlers' : ['hand1'],
2720            },
2721        },
2722        'root' : {
2723            'level' : 'WARNING',
2724        },
2725    }
2726
2727    # As config11 but missing the version key
2728    config12 = {
2729        'true_formatters': {
2730            'form1' : {
2731                'format' : '%(levelname)s ++ %(message)s',
2732            },
2733        },
2734        'handler_configs': {
2735            'hand1' : {
2736                'class' : 'logging.StreamHandler',
2737                'formatter' : 'form1',
2738                'level' : 'NOTSET',
2739                'stream'  : 'ext://sys.stdout',
2740            },
2741        },
2742        'formatters' : 'cfg://true_formatters',
2743        'handlers' : {
2744            'hand1' : 'cfg://handler_configs[hand1]',
2745        },
2746        'loggers' : {
2747            'compiler.parser' : {
2748                'level' : 'DEBUG',
2749                'handlers' : ['hand1'],
2750            },
2751        },
2752        'root' : {
2753            'level' : 'WARNING',
2754        },
2755    }
2756
2757    # As config11 but using an unsupported version
2758    config13 = {
2759        'version': 2,
2760        'true_formatters': {
2761            'form1' : {
2762                'format' : '%(levelname)s ++ %(message)s',
2763            },
2764        },
2765        'handler_configs': {
2766            'hand1' : {
2767                'class' : 'logging.StreamHandler',
2768                'formatter' : 'form1',
2769                'level' : 'NOTSET',
2770                'stream'  : 'ext://sys.stdout',
2771            },
2772        },
2773        'formatters' : 'cfg://true_formatters',
2774        'handlers' : {
2775            'hand1' : 'cfg://handler_configs[hand1]',
2776        },
2777        'loggers' : {
2778            'compiler.parser' : {
2779                'level' : 'DEBUG',
2780                'handlers' : ['hand1'],
2781            },
2782        },
2783        'root' : {
2784            'level' : 'WARNING',
2785        },
2786    }
2787
2788    # As config0, but with properties
2789    config14 = {
2790        'version': 1,
2791        'formatters': {
2792            'form1' : {
2793                'format' : '%(levelname)s ++ %(message)s',
2794            },
2795        },
2796        'handlers' : {
2797            'hand1' : {
2798                'class' : 'logging.StreamHandler',
2799                'formatter' : 'form1',
2800                'level' : 'NOTSET',
2801                'stream'  : 'ext://sys.stdout',
2802                '.': {
2803                    'foo': 'bar',
2804                    'terminator': '!\n',
2805                }
2806            },
2807        },
2808        'root' : {
2809            'level' : 'WARNING',
2810            'handlers' : ['hand1'],
2811        },
2812    }
2813
2814    out_of_order = {
2815        "version": 1,
2816        "formatters": {
2817            "mySimpleFormatter": {
2818                "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s",
2819                "style": "$"
2820            }
2821        },
2822        "handlers": {
2823            "fileGlobal": {
2824                "class": "logging.StreamHandler",
2825                "level": "DEBUG",
2826                "formatter": "mySimpleFormatter"
2827            },
2828            "bufferGlobal": {
2829                "class": "logging.handlers.MemoryHandler",
2830                "capacity": 5,
2831                "formatter": "mySimpleFormatter",
2832                "target": "fileGlobal",
2833                "level": "DEBUG"
2834                }
2835        },
2836        "loggers": {
2837            "mymodule": {
2838                "level": "DEBUG",
2839                "handlers": ["bufferGlobal"],
2840                "propagate": "true"
2841            }
2842        }
2843    }
2844
2845    # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False
2846    custom_formatter_class_validate = {
2847        'version': 1,
2848        'formatters': {
2849            'form1': {
2850                '()': __name__ + '.ExceptionFormatter',
2851                'format': '%(levelname)s:%(name)s:%(message)s',
2852                'validate': False,
2853            },
2854        },
2855        'handlers' : {
2856            'hand1' : {
2857                'class': 'logging.StreamHandler',
2858                'formatter': 'form1',
2859                'level': 'NOTSET',
2860                'stream': 'ext://sys.stdout',
2861            },
2862        },
2863        "loggers": {
2864            "my_test_logger_custom_formatter": {
2865                "level": "DEBUG",
2866                "handlers": ["hand1"],
2867                "propagate": "true"
2868            }
2869        }
2870    }
2871
2872    # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False
2873    custom_formatter_class_validate2 = {
2874        'version': 1,
2875        'formatters': {
2876            'form1': {
2877                'class': __name__ + '.ExceptionFormatter',
2878                'format': '%(levelname)s:%(name)s:%(message)s',
2879                'validate': False,
2880            },
2881        },
2882        'handlers' : {
2883            'hand1' : {
2884                'class': 'logging.StreamHandler',
2885                'formatter': 'form1',
2886                'level': 'NOTSET',
2887                'stream': 'ext://sys.stdout',
2888            },
2889        },
2890        "loggers": {
2891            "my_test_logger_custom_formatter": {
2892                "level": "DEBUG",
2893                "handlers": ["hand1"],
2894                "propagate": "true"
2895            }
2896        }
2897    }
2898
2899    # Configuration with custom class that is not inherited from logging.Formatter
2900    custom_formatter_class_validate3 = {
2901        'version': 1,
2902        'formatters': {
2903            'form1': {
2904                'class': __name__ + '.myCustomFormatter',
2905                'format': '%(levelname)s:%(name)s:%(message)s',
2906                'validate': False,
2907            },
2908        },
2909        'handlers' : {
2910            'hand1' : {
2911                'class': 'logging.StreamHandler',
2912                'formatter': 'form1',
2913                'level': 'NOTSET',
2914                'stream': 'ext://sys.stdout',
2915            },
2916        },
2917        "loggers": {
2918            "my_test_logger_custom_formatter": {
2919                "level": "DEBUG",
2920                "handlers": ["hand1"],
2921                "propagate": "true"
2922            }
2923        }
2924    }
2925
2926    # Configuration with custom function and 'validate' set to False
2927    custom_formatter_with_function = {
2928        'version': 1,
2929        'formatters': {
2930            'form1': {
2931                '()': formatFunc,
2932                'format': '%(levelname)s:%(name)s:%(message)s',
2933                'validate': False,
2934            },
2935        },
2936        'handlers' : {
2937            'hand1' : {
2938                'class': 'logging.StreamHandler',
2939                'formatter': 'form1',
2940                'level': 'NOTSET',
2941                'stream': 'ext://sys.stdout',
2942            },
2943        },
2944        "loggers": {
2945            "my_test_logger_custom_formatter": {
2946                "level": "DEBUG",
2947                "handlers": ["hand1"],
2948                "propagate": "true"
2949            }
2950        }
2951    }
2952
2953    def apply_config(self, conf):
2954        logging.config.dictConfig(conf)
2955
2956    def test_config0_ok(self):
2957        # A simple config which overrides the default settings.
2958        with support.captured_stdout() as output:
2959            self.apply_config(self.config0)
2960            logger = logging.getLogger()
2961            # Won't output anything
2962            logger.info(self.next_message())
2963            # Outputs a message
2964            logger.error(self.next_message())
2965            self.assert_log_lines([
2966                ('ERROR', '2'),
2967            ], stream=output)
2968            # Original logger output is empty.
2969            self.assert_log_lines([])
2970
2971    def test_config1_ok(self, config=config1):
2972        # A config defining a sub-parser as well.
2973        with support.captured_stdout() as output:
2974            self.apply_config(config)
2975            logger = logging.getLogger("compiler.parser")
2976            # Both will output a message
2977            logger.info(self.next_message())
2978            logger.error(self.next_message())
2979            self.assert_log_lines([
2980                ('INFO', '1'),
2981                ('ERROR', '2'),
2982            ], stream=output)
2983            # Original logger output is empty.
2984            self.assert_log_lines([])
2985
2986    def test_config2_failure(self):
2987        # A simple config which overrides the default settings.
2988        self.assertRaises(Exception, self.apply_config, self.config2)
2989
2990    def test_config2a_failure(self):
2991        # A simple config which overrides the default settings.
2992        self.assertRaises(Exception, self.apply_config, self.config2a)
2993
2994    def test_config2b_failure(self):
2995        # A simple config which overrides the default settings.
2996        self.assertRaises(Exception, self.apply_config, self.config2b)
2997
2998    def test_config3_failure(self):
2999        # A simple config which overrides the default settings.
3000        self.assertRaises(Exception, self.apply_config, self.config3)
3001
3002    def test_config4_ok(self):
3003        # A config specifying a custom formatter class.
3004        with support.captured_stdout() as output:
3005            self.apply_config(self.config4)
3006            #logger = logging.getLogger()
3007            try:
3008                raise RuntimeError()
3009            except RuntimeError:
3010                logging.exception("just testing")
3011            sys.stdout.seek(0)
3012            self.assertEqual(output.getvalue(),
3013                "ERROR:root:just testing\nGot a [RuntimeError]\n")
3014            # Original logger output is empty
3015            self.assert_log_lines([])
3016
3017    def test_config4a_ok(self):
3018        # A config specifying a custom formatter class.
3019        with support.captured_stdout() as output:
3020            self.apply_config(self.config4a)
3021            #logger = logging.getLogger()
3022            try:
3023                raise RuntimeError()
3024            except RuntimeError:
3025                logging.exception("just testing")
3026            sys.stdout.seek(0)
3027            self.assertEqual(output.getvalue(),
3028                "ERROR:root:just testing\nGot a [RuntimeError]\n")
3029            # Original logger output is empty
3030            self.assert_log_lines([])
3031
3032    def test_config5_ok(self):
3033        self.test_config1_ok(config=self.config5)
3034
3035    def test_config6_failure(self):
3036        self.assertRaises(Exception, self.apply_config, self.config6)
3037
3038    def test_config7_ok(self):
3039        with support.captured_stdout() as output:
3040            self.apply_config(self.config1)
3041            logger = logging.getLogger("compiler.parser")
3042            # Both will output a message
3043            logger.info(self.next_message())
3044            logger.error(self.next_message())
3045            self.assert_log_lines([
3046                ('INFO', '1'),
3047                ('ERROR', '2'),
3048            ], stream=output)
3049            # Original logger output is empty.
3050            self.assert_log_lines([])
3051        with support.captured_stdout() as output:
3052            self.apply_config(self.config7)
3053            logger = logging.getLogger("compiler.parser")
3054            self.assertTrue(logger.disabled)
3055            logger = logging.getLogger("compiler.lexer")
3056            # Both will output a message
3057            logger.info(self.next_message())
3058            logger.error(self.next_message())
3059            self.assert_log_lines([
3060                ('INFO', '3'),
3061                ('ERROR', '4'),
3062            ], stream=output)
3063            # Original logger output is empty.
3064            self.assert_log_lines([])
3065
3066    # Same as test_config_7_ok but don't disable old loggers.
3067    def test_config_8_ok(self):
3068        with support.captured_stdout() as output:
3069            self.apply_config(self.config1)
3070            logger = logging.getLogger("compiler.parser")
3071            # All will output a message
3072            logger.info(self.next_message())
3073            logger.error(self.next_message())
3074            self.assert_log_lines([
3075                ('INFO', '1'),
3076                ('ERROR', '2'),
3077            ], stream=output)
3078            # Original logger output is empty.
3079            self.assert_log_lines([])
3080        with support.captured_stdout() as output:
3081            self.apply_config(self.config8)
3082            logger = logging.getLogger("compiler.parser")
3083            self.assertFalse(logger.disabled)
3084            # Both will output a message
3085            logger.info(self.next_message())
3086            logger.error(self.next_message())
3087            logger = logging.getLogger("compiler.lexer")
3088            # Both will output a message
3089            logger.info(self.next_message())
3090            logger.error(self.next_message())
3091            self.assert_log_lines([
3092                ('INFO', '3'),
3093                ('ERROR', '4'),
3094                ('INFO', '5'),
3095                ('ERROR', '6'),
3096            ], stream=output)
3097            # Original logger output is empty.
3098            self.assert_log_lines([])
3099
3100    def test_config_8a_ok(self):
3101        with support.captured_stdout() as output:
3102            self.apply_config(self.config1a)
3103            logger = logging.getLogger("compiler.parser")
3104            # See issue #11424. compiler-hyphenated sorts
3105            # between compiler and compiler.xyz and this
3106            # was preventing compiler.xyz from being included
3107            # in the child loggers of compiler because of an
3108            # overzealous loop termination condition.
3109            hyphenated = logging.getLogger('compiler-hyphenated')
3110            # All will output a message
3111            logger.info(self.next_message())
3112            logger.error(self.next_message())
3113            hyphenated.critical(self.next_message())
3114            self.assert_log_lines([
3115                ('INFO', '1'),
3116                ('ERROR', '2'),
3117                ('CRITICAL', '3'),
3118            ], stream=output)
3119            # Original logger output is empty.
3120            self.assert_log_lines([])
3121        with support.captured_stdout() as output:
3122            self.apply_config(self.config8a)
3123            logger = logging.getLogger("compiler.parser")
3124            self.assertFalse(logger.disabled)
3125            # Both will output a message
3126            logger.info(self.next_message())
3127            logger.error(self.next_message())
3128            logger = logging.getLogger("compiler.lexer")
3129            # Both will output a message
3130            logger.info(self.next_message())
3131            logger.error(self.next_message())
3132            # Will not appear
3133            hyphenated.critical(self.next_message())
3134            self.assert_log_lines([
3135                ('INFO', '4'),
3136                ('ERROR', '5'),
3137                ('INFO', '6'),
3138                ('ERROR', '7'),
3139            ], stream=output)
3140            # Original logger output is empty.
3141            self.assert_log_lines([])
3142
3143    def test_config_9_ok(self):
3144        with support.captured_stdout() as output:
3145            self.apply_config(self.config9)
3146            logger = logging.getLogger("compiler.parser")
3147            # Nothing will be output since both handler and logger are set to WARNING
3148            logger.info(self.next_message())
3149            self.assert_log_lines([], stream=output)
3150            self.apply_config(self.config9a)
3151            # Nothing will be output since handler is still set to WARNING
3152            logger.info(self.next_message())
3153            self.assert_log_lines([], stream=output)
3154            self.apply_config(self.config9b)
3155            # Message should now be output
3156            logger.info(self.next_message())
3157            self.assert_log_lines([
3158                ('INFO', '3'),
3159            ], stream=output)
3160
3161    def test_config_10_ok(self):
3162        with support.captured_stdout() as output:
3163            self.apply_config(self.config10)
3164            logger = logging.getLogger("compiler.parser")
3165            logger.warning(self.next_message())
3166            logger = logging.getLogger('compiler')
3167            # Not output, because filtered
3168            logger.warning(self.next_message())
3169            logger = logging.getLogger('compiler.lexer')
3170            # Not output, because filtered
3171            logger.warning(self.next_message())
3172            logger = logging.getLogger("compiler.parser.codegen")
3173            # Output, as not filtered
3174            logger.error(self.next_message())
3175            self.assert_log_lines([
3176                ('WARNING', '1'),
3177                ('ERROR', '4'),
3178            ], stream=output)
3179
3180    def test_config11_ok(self):
3181        self.test_config1_ok(self.config11)
3182
3183    def test_config12_failure(self):
3184        self.assertRaises(Exception, self.apply_config, self.config12)
3185
3186    def test_config13_failure(self):
3187        self.assertRaises(Exception, self.apply_config, self.config13)
3188
3189    def test_config14_ok(self):
3190        with support.captured_stdout() as output:
3191            self.apply_config(self.config14)
3192            h = logging._handlers['hand1']
3193            self.assertEqual(h.foo, 'bar')
3194            self.assertEqual(h.terminator, '!\n')
3195            logging.warning('Exclamation')
3196            self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
3197
3198    def test_config15_ok(self):
3199
3200        def cleanup(h1, fn):
3201            h1.close()
3202            os.remove(fn)
3203
3204        with self.check_no_resource_warning():
3205            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
3206            os.close(fd)
3207
3208            config = {
3209                "version": 1,
3210                "handlers": {
3211                    "file": {
3212                        "class": "logging.FileHandler",
3213                        "filename": fn,
3214                        "encoding": "utf-8",
3215                    }
3216                },
3217                "root": {
3218                    "handlers": ["file"]
3219                }
3220            }
3221
3222            self.apply_config(config)
3223            self.apply_config(config)
3224
3225        handler = logging.root.handlers[0]
3226        self.addCleanup(cleanup, handler, fn)
3227
3228    def setup_via_listener(self, text, verify=None):
3229        text = text.encode("utf-8")
3230        # Ask for a randomly assigned port (by using port 0)
3231        t = logging.config.listen(0, verify)
3232        t.start()
3233        t.ready.wait()
3234        # Now get the port allocated
3235        port = t.port
3236        t.ready.clear()
3237        try:
3238            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3239            sock.settimeout(2.0)
3240            sock.connect(('localhost', port))
3241
3242            slen = struct.pack('>L', len(text))
3243            s = slen + text
3244            sentsofar = 0
3245            left = len(s)
3246            while left > 0:
3247                sent = sock.send(s[sentsofar:])
3248                sentsofar += sent
3249                left -= sent
3250            sock.close()
3251        finally:
3252            t.ready.wait(2.0)
3253            logging.config.stopListening()
3254            threading_helper.join_thread(t)
3255
3256    def test_listen_config_10_ok(self):
3257        with support.captured_stdout() as output:
3258            self.setup_via_listener(json.dumps(self.config10))
3259            logger = logging.getLogger("compiler.parser")
3260            logger.warning(self.next_message())
3261            logger = logging.getLogger('compiler')
3262            # Not output, because filtered
3263            logger.warning(self.next_message())
3264            logger = logging.getLogger('compiler.lexer')
3265            # Not output, because filtered
3266            logger.warning(self.next_message())
3267            logger = logging.getLogger("compiler.parser.codegen")
3268            # Output, as not filtered
3269            logger.error(self.next_message())
3270            self.assert_log_lines([
3271                ('WARNING', '1'),
3272                ('ERROR', '4'),
3273            ], stream=output)
3274
3275    def test_listen_config_1_ok(self):
3276        with support.captured_stdout() as output:
3277            self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
3278            logger = logging.getLogger("compiler.parser")
3279            # Both will output a message
3280            logger.info(self.next_message())
3281            logger.error(self.next_message())
3282            self.assert_log_lines([
3283                ('INFO', '1'),
3284                ('ERROR', '2'),
3285            ], stream=output)
3286            # Original logger output is empty.
3287            self.assert_log_lines([])
3288
3289    def test_listen_verify(self):
3290
3291        def verify_fail(stuff):
3292            return None
3293
3294        def verify_reverse(stuff):
3295            return stuff[::-1]
3296
3297        logger = logging.getLogger("compiler.parser")
3298        to_send = textwrap.dedent(ConfigFileTest.config1)
3299        # First, specify a verification function that will fail.
3300        # We expect to see no output, since our configuration
3301        # never took effect.
3302        with support.captured_stdout() as output:
3303            self.setup_via_listener(to_send, verify_fail)
3304            # Both will output a message
3305            logger.info(self.next_message())
3306            logger.error(self.next_message())
3307        self.assert_log_lines([], stream=output)
3308        # Original logger output has the stuff we logged.
3309        self.assert_log_lines([
3310            ('INFO', '1'),
3311            ('ERROR', '2'),
3312        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3313
3314        # Now, perform no verification. Our configuration
3315        # should take effect.
3316
3317        with support.captured_stdout() as output:
3318            self.setup_via_listener(to_send)    # no verify callable specified
3319            logger = logging.getLogger("compiler.parser")
3320            # Both will output a message
3321            logger.info(self.next_message())
3322            logger.error(self.next_message())
3323        self.assert_log_lines([
3324            ('INFO', '3'),
3325            ('ERROR', '4'),
3326        ], stream=output)
3327        # Original logger output still has the stuff we logged before.
3328        self.assert_log_lines([
3329            ('INFO', '1'),
3330            ('ERROR', '2'),
3331        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3332
3333        # Now, perform verification which transforms the bytes.
3334
3335        with support.captured_stdout() as output:
3336            self.setup_via_listener(to_send[::-1], verify_reverse)
3337            logger = logging.getLogger("compiler.parser")
3338            # Both will output a message
3339            logger.info(self.next_message())
3340            logger.error(self.next_message())
3341        self.assert_log_lines([
3342            ('INFO', '5'),
3343            ('ERROR', '6'),
3344        ], stream=output)
3345        # Original logger output still has the stuff we logged before.
3346        self.assert_log_lines([
3347            ('INFO', '1'),
3348            ('ERROR', '2'),
3349        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3350
3351    def test_out_of_order(self):
3352        self.assertRaises(ValueError, self.apply_config, self.out_of_order)
3353
3354    def test_out_of_order_with_dollar_style(self):
3355        config = copy.deepcopy(self.out_of_order)
3356        config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}"
3357
3358        self.apply_config(config)
3359        handler = logging.getLogger('mymodule').handlers[0]
3360        self.assertIsInstance(handler.target, logging.Handler)
3361        self.assertIsInstance(handler.formatter._style,
3362                              logging.StringTemplateStyle)
3363
3364    def test_custom_formatter_class_with_validate(self):
3365        self.apply_config(self.custom_formatter_class_validate)
3366        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3367        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3368
3369    def test_custom_formatter_class_with_validate2(self):
3370        self.apply_config(self.custom_formatter_class_validate2)
3371        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3372        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3373
3374    def test_custom_formatter_class_with_validate2_with_wrong_fmt(self):
3375        config = self.custom_formatter_class_validate.copy()
3376        config['formatters']['form1']['style'] = "$"
3377
3378        # Exception should not be raise as we have configured 'validate' to False
3379        self.apply_config(config)
3380        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3381        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3382
3383    def test_custom_formatter_class_with_validate3(self):
3384        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3)
3385
3386    def test_custom_formatter_function_with_validate(self):
3387        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function)
3388
3389    def test_baseconfig(self):
3390        d = {
3391            'atuple': (1, 2, 3),
3392            'alist': ['a', 'b', 'c'],
3393            'adict': {'d': 'e', 'f': 3 },
3394            'nest1': ('g', ('h', 'i'), 'j'),
3395            'nest2': ['k', ['l', 'm'], 'n'],
3396            'nest3': ['o', 'cfg://alist', 'p'],
3397        }
3398        bc = logging.config.BaseConfigurator(d)
3399        self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
3400        self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
3401        self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
3402        self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
3403        self.assertEqual(bc.convert('cfg://adict.d'), 'e')
3404        self.assertEqual(bc.convert('cfg://adict[f]'), 3)
3405        v = bc.convert('cfg://nest3')
3406        self.assertEqual(v.pop(1), ['a', 'b', 'c'])
3407        self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
3408        self.assertRaises(ValueError, bc.convert, 'cfg://!')
3409        self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
3410
3411    def test_namedtuple(self):
3412        # see bpo-39142
3413        from collections import namedtuple
3414
3415        class MyHandler(logging.StreamHandler):
3416            def __init__(self, resource, *args, **kwargs):
3417                super().__init__(*args, **kwargs)
3418                self.resource: namedtuple = resource
3419
3420            def emit(self, record):
3421                record.msg += f' {self.resource.type}'
3422                return super().emit(record)
3423
3424        Resource = namedtuple('Resource', ['type', 'labels'])
3425        resource = Resource(type='my_type', labels=['a'])
3426
3427        config = {
3428            'version': 1,
3429            'handlers': {
3430                'myhandler': {
3431                    '()': MyHandler,
3432                    'resource': resource
3433                }
3434            },
3435            'root':  {'level': 'INFO', 'handlers': ['myhandler']},
3436        }
3437        with support.captured_stderr() as stderr:
3438            self.apply_config(config)
3439            logging.info('some log')
3440        self.assertEqual(stderr.getvalue(), 'some log my_type\n')
3441
3442class ManagerTest(BaseTest):
3443    def test_manager_loggerclass(self):
3444        logged = []
3445
3446        class MyLogger(logging.Logger):
3447            def _log(self, level, msg, args, exc_info=None, extra=None):
3448                logged.append(msg)
3449
3450        man = logging.Manager(None)
3451        self.assertRaises(TypeError, man.setLoggerClass, int)
3452        man.setLoggerClass(MyLogger)
3453        logger = man.getLogger('test')
3454        logger.warning('should appear in logged')
3455        logging.warning('should not appear in logged')
3456
3457        self.assertEqual(logged, ['should appear in logged'])
3458
3459    def test_set_log_record_factory(self):
3460        man = logging.Manager(None)
3461        expected = object()
3462        man.setLogRecordFactory(expected)
3463        self.assertEqual(man.logRecordFactory, expected)
3464
3465class ChildLoggerTest(BaseTest):
3466    def test_child_loggers(self):
3467        r = logging.getLogger()
3468        l1 = logging.getLogger('abc')
3469        l2 = logging.getLogger('def.ghi')
3470        c1 = r.getChild('xyz')
3471        c2 = r.getChild('uvw.xyz')
3472        self.assertIs(c1, logging.getLogger('xyz'))
3473        self.assertIs(c2, logging.getLogger('uvw.xyz'))
3474        c1 = l1.getChild('def')
3475        c2 = c1.getChild('ghi')
3476        c3 = l1.getChild('def.ghi')
3477        self.assertIs(c1, logging.getLogger('abc.def'))
3478        self.assertIs(c2, logging.getLogger('abc.def.ghi'))
3479        self.assertIs(c2, c3)
3480
3481
3482class DerivedLogRecord(logging.LogRecord):
3483    pass
3484
3485class LogRecordFactoryTest(BaseTest):
3486
3487    def setUp(self):
3488        class CheckingFilter(logging.Filter):
3489            def __init__(self, cls):
3490                self.cls = cls
3491
3492            def filter(self, record):
3493                t = type(record)
3494                if t is not self.cls:
3495                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
3496                            self.cls)
3497                    raise TypeError(msg)
3498                return True
3499
3500        BaseTest.setUp(self)
3501        self.filter = CheckingFilter(DerivedLogRecord)
3502        self.root_logger.addFilter(self.filter)
3503        self.orig_factory = logging.getLogRecordFactory()
3504
3505    def tearDown(self):
3506        self.root_logger.removeFilter(self.filter)
3507        BaseTest.tearDown(self)
3508        logging.setLogRecordFactory(self.orig_factory)
3509
3510    def test_logrecord_class(self):
3511        self.assertRaises(TypeError, self.root_logger.warning,
3512                          self.next_message())
3513        logging.setLogRecordFactory(DerivedLogRecord)
3514        self.root_logger.error(self.next_message())
3515        self.assert_log_lines([
3516           ('root', 'ERROR', '2'),
3517        ])
3518
3519
3520class QueueHandlerTest(BaseTest):
3521    # Do not bother with a logger name group.
3522    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
3523
3524    def setUp(self):
3525        BaseTest.setUp(self)
3526        self.queue = queue.Queue(-1)
3527        self.que_hdlr = logging.handlers.QueueHandler(self.queue)
3528        self.name = 'que'
3529        self.que_logger = logging.getLogger('que')
3530        self.que_logger.propagate = False
3531        self.que_logger.setLevel(logging.WARNING)
3532        self.que_logger.addHandler(self.que_hdlr)
3533
3534    def tearDown(self):
3535        self.que_hdlr.close()
3536        BaseTest.tearDown(self)
3537
3538    def test_queue_handler(self):
3539        self.que_logger.debug(self.next_message())
3540        self.assertRaises(queue.Empty, self.queue.get_nowait)
3541        self.que_logger.info(self.next_message())
3542        self.assertRaises(queue.Empty, self.queue.get_nowait)
3543        msg = self.next_message()
3544        self.que_logger.warning(msg)
3545        data = self.queue.get_nowait()
3546        self.assertTrue(isinstance(data, logging.LogRecord))
3547        self.assertEqual(data.name, self.que_logger.name)
3548        self.assertEqual((data.msg, data.args), (msg, None))
3549
3550    def test_formatting(self):
3551        msg = self.next_message()
3552        levelname = logging.getLevelName(logging.WARNING)
3553        log_format_str = '{name} -> {levelname}: {message}'
3554        formatted_msg = log_format_str.format(name=self.name,
3555                                              levelname=levelname, message=msg)
3556        formatter = logging.Formatter(self.log_format)
3557        self.que_hdlr.setFormatter(formatter)
3558        self.que_logger.warning(msg)
3559        log_record = self.queue.get_nowait()
3560        self.assertEqual(formatted_msg, log_record.msg)
3561        self.assertEqual(formatted_msg, log_record.message)
3562
3563    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3564                         'logging.handlers.QueueListener required for this test')
3565    def test_queue_listener(self):
3566        handler = TestHandler(support.Matcher())
3567        listener = logging.handlers.QueueListener(self.queue, handler)
3568        listener.start()
3569        try:
3570            self.que_logger.warning(self.next_message())
3571            self.que_logger.error(self.next_message())
3572            self.que_logger.critical(self.next_message())
3573        finally:
3574            listener.stop()
3575        self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
3576        self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
3577        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
3578        handler.close()
3579
3580        # Now test with respect_handler_level set
3581
3582        handler = TestHandler(support.Matcher())
3583        handler.setLevel(logging.CRITICAL)
3584        listener = logging.handlers.QueueListener(self.queue, handler,
3585                                                  respect_handler_level=True)
3586        listener.start()
3587        try:
3588            self.que_logger.warning(self.next_message())
3589            self.que_logger.error(self.next_message())
3590            self.que_logger.critical(self.next_message())
3591        finally:
3592            listener.stop()
3593        self.assertFalse(handler.matches(levelno=logging.WARNING, message='4'))
3594        self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
3595        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
3596        handler.close()
3597
3598    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3599                         'logging.handlers.QueueListener required for this test')
3600    def test_queue_listener_with_StreamHandler(self):
3601        # Test that traceback only appends once (bpo-34334).
3602        listener = logging.handlers.QueueListener(self.queue, self.root_hdlr)
3603        listener.start()
3604        try:
3605            1 / 0
3606        except ZeroDivisionError as e:
3607            exc = e
3608            self.que_logger.exception(self.next_message(), exc_info=exc)
3609        listener.stop()
3610        self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1)
3611
3612    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3613                         'logging.handlers.QueueListener required for this test')
3614    def test_queue_listener_with_multiple_handlers(self):
3615        # Test that queue handler format doesn't affect other handler formats (bpo-35726).
3616        self.que_hdlr.setFormatter(self.root_formatter)
3617        self.que_logger.addHandler(self.root_hdlr)
3618
3619        listener = logging.handlers.QueueListener(self.queue, self.que_hdlr)
3620        listener.start()
3621        self.que_logger.error("error")
3622        listener.stop()
3623        self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error")
3624
3625if hasattr(logging.handlers, 'QueueListener'):
3626    import multiprocessing
3627    from unittest.mock import patch
3628
3629    class QueueListenerTest(BaseTest):
3630        """
3631        Tests based on patch submitted for issue #27930. Ensure that
3632        QueueListener handles all log messages.
3633        """
3634
3635        repeat = 20
3636
3637        @staticmethod
3638        def setup_and_log(log_queue, ident):
3639            """
3640            Creates a logger with a QueueHandler that logs to a queue read by a
3641            QueueListener. Starts the listener, logs five messages, and stops
3642            the listener.
3643            """
3644            logger = logging.getLogger('test_logger_with_id_%s' % ident)
3645            logger.setLevel(logging.DEBUG)
3646            handler = logging.handlers.QueueHandler(log_queue)
3647            logger.addHandler(handler)
3648            listener = logging.handlers.QueueListener(log_queue)
3649            listener.start()
3650
3651            logger.info('one')
3652            logger.info('two')
3653            logger.info('three')
3654            logger.info('four')
3655            logger.info('five')
3656
3657            listener.stop()
3658            logger.removeHandler(handler)
3659            handler.close()
3660
3661        @patch.object(logging.handlers.QueueListener, 'handle')
3662        def test_handle_called_with_queue_queue(self, mock_handle):
3663            for i in range(self.repeat):
3664                log_queue = queue.Queue()
3665                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3666            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3667                             'correct number of handled log messages')
3668
3669        @patch.object(logging.handlers.QueueListener, 'handle')
3670        def test_handle_called_with_mp_queue(self, mock_handle):
3671            # bpo-28668: The multiprocessing (mp) module is not functional
3672            # when the mp.synchronize module cannot be imported.
3673            support.skip_if_broken_multiprocessing_synchronize()
3674            for i in range(self.repeat):
3675                log_queue = multiprocessing.Queue()
3676                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3677                log_queue.close()
3678                log_queue.join_thread()
3679            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3680                             'correct number of handled log messages')
3681
3682        @staticmethod
3683        def get_all_from_queue(log_queue):
3684            try:
3685                while True:
3686                    yield log_queue.get_nowait()
3687            except queue.Empty:
3688                return []
3689
3690        def test_no_messages_in_queue_after_stop(self):
3691            """
3692            Five messages are logged then the QueueListener is stopped. This
3693            test then gets everything off the queue. Failure of this test
3694            indicates that messages were not registered on the queue until
3695            _after_ the QueueListener stopped.
3696            """
3697            # bpo-28668: The multiprocessing (mp) module is not functional
3698            # when the mp.synchronize module cannot be imported.
3699            support.skip_if_broken_multiprocessing_synchronize()
3700            for i in range(self.repeat):
3701                queue = multiprocessing.Queue()
3702                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
3703                # time.sleep(1)
3704                items = list(self.get_all_from_queue(queue))
3705                queue.close()
3706                queue.join_thread()
3707
3708                expected = [[], [logging.handlers.QueueListener._sentinel]]
3709                self.assertIn(items, expected,
3710                              'Found unexpected messages in queue: %s' % (
3711                                    [m.msg if isinstance(m, logging.LogRecord)
3712                                     else m for m in items]))
3713
3714        def test_calls_task_done_after_stop(self):
3715            # Issue 36813: Make sure queue.join does not deadlock.
3716            log_queue = queue.Queue()
3717            listener = logging.handlers.QueueListener(log_queue)
3718            listener.start()
3719            listener.stop()
3720            with self.assertRaises(ValueError):
3721                # Make sure all tasks are done and .join won't block.
3722                log_queue.task_done()
3723
3724
3725ZERO = datetime.timedelta(0)
3726
3727class UTC(datetime.tzinfo):
3728    def utcoffset(self, dt):
3729        return ZERO
3730
3731    dst = utcoffset
3732
3733    def tzname(self, dt):
3734        return 'UTC'
3735
3736utc = UTC()
3737
3738class AssertErrorMessage:
3739
3740    def assert_error_message(self, exception, message, *args, **kwargs):
3741        try:
3742            self.assertRaises((), *args, **kwargs)
3743        except exception as e:
3744            self.assertEqual(message, str(e))
3745
3746class FormatterTest(unittest.TestCase, AssertErrorMessage):
3747    def setUp(self):
3748        self.common = {
3749            'name': 'formatter.test',
3750            'level': logging.DEBUG,
3751            'pathname': os.path.join('path', 'to', 'dummy.ext'),
3752            'lineno': 42,
3753            'exc_info': None,
3754            'func': None,
3755            'msg': 'Message with %d %s',
3756            'args': (2, 'placeholders'),
3757        }
3758        self.variants = {
3759            'custom': {
3760                'custom': 1234
3761            }
3762        }
3763
3764    def get_record(self, name=None):
3765        result = dict(self.common)
3766        if name is not None:
3767            result.update(self.variants[name])
3768        return logging.makeLogRecord(result)
3769
3770    def test_percent(self):
3771        # Test %-formatting
3772        r = self.get_record()
3773        f = logging.Formatter('${%(message)s}')
3774        self.assertEqual(f.format(r), '${Message with 2 placeholders}')
3775        f = logging.Formatter('%(random)s')
3776        self.assertRaises(ValueError, f.format, r)
3777        self.assertFalse(f.usesTime())
3778        f = logging.Formatter('%(asctime)s')
3779        self.assertTrue(f.usesTime())
3780        f = logging.Formatter('%(asctime)-15s')
3781        self.assertTrue(f.usesTime())
3782        f = logging.Formatter('%(asctime)#15s')
3783        self.assertTrue(f.usesTime())
3784
3785    def test_braces(self):
3786        # Test {}-formatting
3787        r = self.get_record()
3788        f = logging.Formatter('$%{message}%$', style='{')
3789        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3790        f = logging.Formatter('{random}', style='{')
3791        self.assertRaises(ValueError, f.format, r)
3792        f = logging.Formatter("{message}", style='{')
3793        self.assertFalse(f.usesTime())
3794        f = logging.Formatter('{asctime}', style='{')
3795        self.assertTrue(f.usesTime())
3796        f = logging.Formatter('{asctime!s:15}', style='{')
3797        self.assertTrue(f.usesTime())
3798        f = logging.Formatter('{asctime:15}', style='{')
3799        self.assertTrue(f.usesTime())
3800
3801    def test_dollars(self):
3802        # Test $-formatting
3803        r = self.get_record()
3804        f = logging.Formatter('${message}', style='$')
3805        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3806        f = logging.Formatter('$message', style='$')
3807        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3808        f = logging.Formatter('$$%${message}%$$', style='$')
3809        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3810        f = logging.Formatter('${random}', style='$')
3811        self.assertRaises(ValueError, f.format, r)
3812        self.assertFalse(f.usesTime())
3813        f = logging.Formatter('${asctime}', style='$')
3814        self.assertTrue(f.usesTime())
3815        f = logging.Formatter('$asctime', style='$')
3816        self.assertTrue(f.usesTime())
3817        f = logging.Formatter('${message}', style='$')
3818        self.assertFalse(f.usesTime())
3819        f = logging.Formatter('${asctime}--', style='$')
3820        self.assertTrue(f.usesTime())
3821
3822    def test_format_validate(self):
3823        # Check correct formatting
3824        # Percentage style
3825        f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3826        self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3827        f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3828        self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3829        f = logging.Formatter("%(process)#+027.23X")
3830        self.assertEqual(f._fmt, "%(process)#+027.23X")
3831        f = logging.Formatter("%(foo)#.*g")
3832        self.assertEqual(f._fmt, "%(foo)#.*g")
3833
3834        # StrFormat Style
3835        f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{")
3836        self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}")
3837        f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{")
3838        self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}")
3839        f = logging.Formatter("{customfield!s:#<30}", style="{")
3840        self.assertEqual(f._fmt, "{customfield!s:#<30}")
3841        f = logging.Formatter("{message!r}", style="{")
3842        self.assertEqual(f._fmt, "{message!r}")
3843        f = logging.Formatter("{message!s}", style="{")
3844        self.assertEqual(f._fmt, "{message!s}")
3845        f = logging.Formatter("{message!a}", style="{")
3846        self.assertEqual(f._fmt, "{message!a}")
3847        f = logging.Formatter("{process!r:4.2}", style="{")
3848        self.assertEqual(f._fmt, "{process!r:4.2}")
3849        f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{")
3850        self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}")
3851        f = logging.Formatter("{process!s:{w},.{p}}", style="{")
3852        self.assertEqual(f._fmt, "{process!s:{w},.{p}}")
3853        f = logging.Formatter("{foo:12.{p}}", style="{")
3854        self.assertEqual(f._fmt, "{foo:12.{p}}")
3855        f = logging.Formatter("{foo:{w}.6}", style="{")
3856        self.assertEqual(f._fmt, "{foo:{w}.6}")
3857        f = logging.Formatter("{foo[0].bar[1].baz}", style="{")
3858        self.assertEqual(f._fmt, "{foo[0].bar[1].baz}")
3859        f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{")
3860        self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}")
3861        f = logging.Formatter("{12[k1].bar[k2].baz}", style="{")
3862        self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}")
3863
3864        # Dollar style
3865        f = logging.Formatter("${asctime} - $message", style="$")
3866        self.assertEqual(f._fmt, "${asctime} - $message")
3867        f = logging.Formatter("$bar $$", style="$")
3868        self.assertEqual(f._fmt, "$bar $$")
3869        f = logging.Formatter("$bar $$$$", style="$")
3870        self.assertEqual(f._fmt, "$bar $$$$")  # this would print two $($$)
3871
3872        # Testing when ValueError being raised from incorrect format
3873        # Percentage Style
3874        self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z")
3875        self.assertRaises(ValueError, logging.Formatter, "%(asctime)b")
3876        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*")
3877        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s")
3878        self.assertRaises(ValueError, logging.Formatter, "%(asctime)_")
3879        self.assertRaises(ValueError, logging.Formatter, '{asctime}')
3880        self.assertRaises(ValueError, logging.Formatter, '${message}')
3881        self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f')  # with both * and decimal number as precision
3882        self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f')
3883
3884        # StrFormat Style
3885        # Testing failure for '-' in field name
3886        self.assert_error_message(
3887            ValueError,
3888            "invalid format: invalid field name/expression: 'name-thing'",
3889            logging.Formatter, "{name-thing}", style="{"
3890        )
3891        # Testing failure for style mismatch
3892        self.assert_error_message(
3893            ValueError,
3894            "invalid format: no fields",
3895            logging.Formatter, '%(asctime)s', style='{'
3896        )
3897        # Testing failure for invalid conversion
3898        self.assert_error_message(
3899            ValueError,
3900            "invalid conversion: 'Z'"
3901        )
3902        self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{')
3903        self.assert_error_message(
3904            ValueError,
3905            "invalid format: expected ':' after conversion specifier",
3906            logging.Formatter, '{asctime!aa:15}', style='{'
3907        )
3908        # Testing failure for invalid spec
3909        self.assert_error_message(
3910            ValueError,
3911            "invalid format: bad specifier: '.2ff'",
3912            logging.Formatter, '{process:.2ff}', style='{'
3913        )
3914        self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{')
3915        self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{')
3916        self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{')
3917        self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{')
3918        # Testing failure for mismatch braces
3919        self.assert_error_message(
3920            ValueError,
3921            "invalid format: expected '}' before end of string",
3922            logging.Formatter, '{process', style='{'
3923        )
3924        self.assert_error_message(
3925            ValueError,
3926            "invalid format: Single '}' encountered in format string",
3927            logging.Formatter, 'process}', style='{'
3928        )
3929        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{')
3930        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{')
3931        self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{')
3932        self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{')
3933        self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{')
3934        self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{')
3935        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{')
3936        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{')
3937        self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{')
3938
3939        # Dollar style
3940        # Testing failure for mismatch bare $
3941        self.assert_error_message(
3942            ValueError,
3943            "invalid format: bare \'$\' not allowed",
3944            logging.Formatter, '$bar $$$', style='$'
3945        )
3946        self.assert_error_message(
3947            ValueError,
3948            "invalid format: bare \'$\' not allowed",
3949            logging.Formatter, 'bar $', style='$'
3950        )
3951        self.assert_error_message(
3952            ValueError,
3953            "invalid format: bare \'$\' not allowed",
3954            logging.Formatter, 'foo $.', style='$'
3955        )
3956        # Testing failure for mismatch style
3957        self.assert_error_message(
3958            ValueError,
3959            "invalid format: no fields",
3960            logging.Formatter, '{asctime}', style='$'
3961        )
3962        self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$')
3963
3964        # Testing failure for incorrect fields
3965        self.assert_error_message(
3966            ValueError,
3967            "invalid format: no fields",
3968            logging.Formatter, 'foo', style='$'
3969        )
3970        self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$')
3971
3972    def test_defaults_parameter(self):
3973        fmts = ['%(custom)s %(message)s', '{custom} {message}', '$custom $message']
3974        styles = ['%', '{', '$']
3975        for fmt, style in zip(fmts, styles):
3976            f = logging.Formatter(fmt, style=style, defaults={'custom': 'Default'})
3977            r = self.get_record()
3978            self.assertEqual(f.format(r), 'Default Message with 2 placeholders')
3979            r = self.get_record("custom")
3980            self.assertEqual(f.format(r), '1234 Message with 2 placeholders')
3981
3982            # Without default
3983            f = logging.Formatter(fmt, style=style)
3984            r = self.get_record()
3985            self.assertRaises(ValueError, f.format, r)
3986
3987            # Non-existing default is ignored
3988            f = logging.Formatter(fmt, style=style, defaults={'Non-existing': 'Default'})
3989            r = self.get_record("custom")
3990            self.assertEqual(f.format(r), '1234 Message with 2 placeholders')
3991
3992    def test_invalid_style(self):
3993        self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
3994
3995    def test_time(self):
3996        r = self.get_record()
3997        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
3998        # We use None to indicate we want the local timezone
3999        # We're essentially converting a UTC time to local time
4000        r.created = time.mktime(dt.astimezone(None).timetuple())
4001        r.msecs = 123
4002        f = logging.Formatter('%(asctime)s %(message)s')
4003        f.converter = time.gmtime
4004        self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
4005        self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
4006        f.format(r)
4007        self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
4008
4009    def test_default_msec_format_none(self):
4010        class NoMsecFormatter(logging.Formatter):
4011            default_msec_format = None
4012            default_time_format = '%d/%m/%Y %H:%M:%S'
4013
4014        r = self.get_record()
4015        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 123, utc)
4016        r.created = time.mktime(dt.astimezone(None).timetuple())
4017        f = NoMsecFormatter()
4018        f.converter = time.gmtime
4019        self.assertEqual(f.formatTime(r), '21/04/1993 08:03:00')
4020
4021
4022class TestBufferingFormatter(logging.BufferingFormatter):
4023    def formatHeader(self, records):
4024        return '[(%d)' % len(records)
4025
4026    def formatFooter(self, records):
4027        return '(%d)]' % len(records)
4028
4029class BufferingFormatterTest(unittest.TestCase):
4030    def setUp(self):
4031        self.records = [
4032            logging.makeLogRecord({'msg': 'one'}),
4033            logging.makeLogRecord({'msg': 'two'}),
4034        ]
4035
4036    def test_default(self):
4037        f = logging.BufferingFormatter()
4038        self.assertEqual('', f.format([]))
4039        self.assertEqual('onetwo', f.format(self.records))
4040
4041    def test_custom(self):
4042        f = TestBufferingFormatter()
4043        self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
4044        lf = logging.Formatter('<%(message)s>')
4045        f = TestBufferingFormatter(lf)
4046        self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
4047
4048class ExceptionTest(BaseTest):
4049    def test_formatting(self):
4050        r = self.root_logger
4051        h = RecordingHandler()
4052        r.addHandler(h)
4053        try:
4054            raise RuntimeError('deliberate mistake')
4055        except:
4056            logging.exception('failed', stack_info=True)
4057        r.removeHandler(h)
4058        h.close()
4059        r = h.records[0]
4060        self.assertTrue(r.exc_text.startswith('Traceback (most recent '
4061                                              'call last):\n'))
4062        self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
4063                                            'deliberate mistake'))
4064        self.assertTrue(r.stack_info.startswith('Stack (most recent '
4065                                              'call last):\n'))
4066        self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
4067                                            'stack_info=True)'))
4068
4069
4070class LastResortTest(BaseTest):
4071    def test_last_resort(self):
4072        # Test the last resort handler
4073        root = self.root_logger
4074        root.removeHandler(self.root_hdlr)
4075        old_lastresort = logging.lastResort
4076        old_raise_exceptions = logging.raiseExceptions
4077
4078        try:
4079            with support.captured_stderr() as stderr:
4080                root.debug('This should not appear')
4081                self.assertEqual(stderr.getvalue(), '')
4082                root.warning('Final chance!')
4083                self.assertEqual(stderr.getvalue(), 'Final chance!\n')
4084
4085            # No handlers and no last resort, so 'No handlers' message
4086            logging.lastResort = None
4087            with support.captured_stderr() as stderr:
4088                root.warning('Final chance!')
4089                msg = 'No handlers could be found for logger "root"\n'
4090                self.assertEqual(stderr.getvalue(), msg)
4091
4092            # 'No handlers' message only printed once
4093            with support.captured_stderr() as stderr:
4094                root.warning('Final chance!')
4095                self.assertEqual(stderr.getvalue(), '')
4096
4097            # If raiseExceptions is False, no message is printed
4098            root.manager.emittedNoHandlerWarning = False
4099            logging.raiseExceptions = False
4100            with support.captured_stderr() as stderr:
4101                root.warning('Final chance!')
4102                self.assertEqual(stderr.getvalue(), '')
4103        finally:
4104            root.addHandler(self.root_hdlr)
4105            logging.lastResort = old_lastresort
4106            logging.raiseExceptions = old_raise_exceptions
4107
4108
4109class FakeHandler:
4110
4111    def __init__(self, identifier, called):
4112        for method in ('acquire', 'flush', 'close', 'release'):
4113            setattr(self, method, self.record_call(identifier, method, called))
4114
4115    def record_call(self, identifier, method_name, called):
4116        def inner():
4117            called.append('{} - {}'.format(identifier, method_name))
4118        return inner
4119
4120
4121class RecordingHandler(logging.NullHandler):
4122
4123    def __init__(self, *args, **kwargs):
4124        super(RecordingHandler, self).__init__(*args, **kwargs)
4125        self.records = []
4126
4127    def handle(self, record):
4128        """Keep track of all the emitted records."""
4129        self.records.append(record)
4130
4131
4132class ShutdownTest(BaseTest):
4133
4134    """Test suite for the shutdown method."""
4135
4136    def setUp(self):
4137        super(ShutdownTest, self).setUp()
4138        self.called = []
4139
4140        raise_exceptions = logging.raiseExceptions
4141        self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
4142
4143    def raise_error(self, error):
4144        def inner():
4145            raise error()
4146        return inner
4147
4148    def test_no_failure(self):
4149        # create some fake handlers
4150        handler0 = FakeHandler(0, self.called)
4151        handler1 = FakeHandler(1, self.called)
4152        handler2 = FakeHandler(2, self.called)
4153
4154        # create live weakref to those handlers
4155        handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
4156
4157        logging.shutdown(handlerList=list(handlers))
4158
4159        expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
4160                    '1 - acquire', '1 - flush', '1 - close', '1 - release',
4161                    '0 - acquire', '0 - flush', '0 - close', '0 - release']
4162        self.assertEqual(expected, self.called)
4163
4164    def _test_with_failure_in_method(self, method, error):
4165        handler = FakeHandler(0, self.called)
4166        setattr(handler, method, self.raise_error(error))
4167        handlers = [logging.weakref.ref(handler)]
4168
4169        logging.shutdown(handlerList=list(handlers))
4170
4171        self.assertEqual('0 - release', self.called[-1])
4172
4173    def test_with_ioerror_in_acquire(self):
4174        self._test_with_failure_in_method('acquire', OSError)
4175
4176    def test_with_ioerror_in_flush(self):
4177        self._test_with_failure_in_method('flush', OSError)
4178
4179    def test_with_ioerror_in_close(self):
4180        self._test_with_failure_in_method('close', OSError)
4181
4182    def test_with_valueerror_in_acquire(self):
4183        self._test_with_failure_in_method('acquire', ValueError)
4184
4185    def test_with_valueerror_in_flush(self):
4186        self._test_with_failure_in_method('flush', ValueError)
4187
4188    def test_with_valueerror_in_close(self):
4189        self._test_with_failure_in_method('close', ValueError)
4190
4191    def test_with_other_error_in_acquire_without_raise(self):
4192        logging.raiseExceptions = False
4193        self._test_with_failure_in_method('acquire', IndexError)
4194
4195    def test_with_other_error_in_flush_without_raise(self):
4196        logging.raiseExceptions = False
4197        self._test_with_failure_in_method('flush', IndexError)
4198
4199    def test_with_other_error_in_close_without_raise(self):
4200        logging.raiseExceptions = False
4201        self._test_with_failure_in_method('close', IndexError)
4202
4203    def test_with_other_error_in_acquire_with_raise(self):
4204        logging.raiseExceptions = True
4205        self.assertRaises(IndexError, self._test_with_failure_in_method,
4206                          'acquire', IndexError)
4207
4208    def test_with_other_error_in_flush_with_raise(self):
4209        logging.raiseExceptions = True
4210        self.assertRaises(IndexError, self._test_with_failure_in_method,
4211                          'flush', IndexError)
4212
4213    def test_with_other_error_in_close_with_raise(self):
4214        logging.raiseExceptions = True
4215        self.assertRaises(IndexError, self._test_with_failure_in_method,
4216                          'close', IndexError)
4217
4218
4219class ModuleLevelMiscTest(BaseTest):
4220
4221    """Test suite for some module level methods."""
4222
4223    def test_disable(self):
4224        old_disable = logging.root.manager.disable
4225        # confirm our assumptions are correct
4226        self.assertEqual(old_disable, 0)
4227        self.addCleanup(logging.disable, old_disable)
4228
4229        logging.disable(83)
4230        self.assertEqual(logging.root.manager.disable, 83)
4231
4232        self.assertRaises(ValueError, logging.disable, "doesnotexists")
4233
4234        class _NotAnIntOrString:
4235            pass
4236
4237        self.assertRaises(TypeError, logging.disable, _NotAnIntOrString())
4238
4239        logging.disable("WARN")
4240
4241        # test the default value introduced in 3.7
4242        # (Issue #28524)
4243        logging.disable()
4244        self.assertEqual(logging.root.manager.disable, logging.CRITICAL)
4245
4246    def _test_log(self, method, level=None):
4247        called = []
4248        support.patch(self, logging, 'basicConfig',
4249                      lambda *a, **kw: called.append((a, kw)))
4250
4251        recording = RecordingHandler()
4252        logging.root.addHandler(recording)
4253
4254        log_method = getattr(logging, method)
4255        if level is not None:
4256            log_method(level, "test me: %r", recording)
4257        else:
4258            log_method("test me: %r", recording)
4259
4260        self.assertEqual(len(recording.records), 1)
4261        record = recording.records[0]
4262        self.assertEqual(record.getMessage(), "test me: %r" % recording)
4263
4264        expected_level = level if level is not None else getattr(logging, method.upper())
4265        self.assertEqual(record.levelno, expected_level)
4266
4267        # basicConfig was not called!
4268        self.assertEqual(called, [])
4269
4270    def test_log(self):
4271        self._test_log('log', logging.ERROR)
4272
4273    def test_debug(self):
4274        self._test_log('debug')
4275
4276    def test_info(self):
4277        self._test_log('info')
4278
4279    def test_warning(self):
4280        self._test_log('warning')
4281
4282    def test_error(self):
4283        self._test_log('error')
4284
4285    def test_critical(self):
4286        self._test_log('critical')
4287
4288    def test_set_logger_class(self):
4289        self.assertRaises(TypeError, logging.setLoggerClass, object)
4290
4291        class MyLogger(logging.Logger):
4292            pass
4293
4294        logging.setLoggerClass(MyLogger)
4295        self.assertEqual(logging.getLoggerClass(), MyLogger)
4296
4297        logging.setLoggerClass(logging.Logger)
4298        self.assertEqual(logging.getLoggerClass(), logging.Logger)
4299
4300    def test_subclass_logger_cache(self):
4301        # bpo-37258
4302        message = []
4303
4304        class MyLogger(logging.getLoggerClass()):
4305            def __init__(self, name='MyLogger', level=logging.NOTSET):
4306                super().__init__(name, level)
4307                message.append('initialized')
4308
4309        logging.setLoggerClass(MyLogger)
4310        logger = logging.getLogger('just_some_logger')
4311        self.assertEqual(message, ['initialized'])
4312        stream = io.StringIO()
4313        h = logging.StreamHandler(stream)
4314        logger.addHandler(h)
4315        try:
4316            logger.setLevel(logging.DEBUG)
4317            logger.debug("hello")
4318            self.assertEqual(stream.getvalue().strip(), "hello")
4319
4320            stream.truncate(0)
4321            stream.seek(0)
4322
4323            logger.setLevel(logging.INFO)
4324            logger.debug("hello")
4325            self.assertEqual(stream.getvalue(), "")
4326        finally:
4327            logger.removeHandler(h)
4328            h.close()
4329            logging.setLoggerClass(logging.Logger)
4330
4331    def test_logging_at_shutdown(self):
4332        # bpo-20037: Doing text I/O late at interpreter shutdown must not crash
4333        code = textwrap.dedent("""
4334            import logging
4335
4336            class A:
4337                def __del__(self):
4338                    try:
4339                        raise ValueError("some error")
4340                    except Exception:
4341                        logging.exception("exception in __del__")
4342
4343            a = A()
4344        """)
4345        rc, out, err = assert_python_ok("-c", code)
4346        err = err.decode()
4347        self.assertIn("exception in __del__", err)
4348        self.assertIn("ValueError: some error", err)
4349
4350    def test_logging_at_shutdown_open(self):
4351        # bpo-26789: FileHandler keeps a reference to the builtin open()
4352        # function to be able to open or reopen the file during Python
4353        # finalization.
4354        filename = os_helper.TESTFN
4355        self.addCleanup(os_helper.unlink, filename)
4356
4357        code = textwrap.dedent(f"""
4358            import builtins
4359            import logging
4360
4361            class A:
4362                def __del__(self):
4363                    logging.error("log in __del__")
4364
4365            # basicConfig() opens the file, but logging.shutdown() closes
4366            # it at Python exit. When A.__del__() is called,
4367            # FileHandler._open() must be called again to re-open the file.
4368            logging.basicConfig(filename={filename!r}, encoding="utf-8")
4369
4370            a = A()
4371
4372            # Simulate the Python finalization which removes the builtin
4373            # open() function.
4374            del builtins.open
4375        """)
4376        assert_python_ok("-c", code)
4377
4378        with open(filename, encoding="utf-8") as fp:
4379            self.assertEqual(fp.read().rstrip(), "ERROR:root:log in __del__")
4380
4381    def test_recursion_error(self):
4382        # Issue 36272
4383        code = textwrap.dedent("""
4384            import logging
4385
4386            def rec():
4387                logging.error("foo")
4388                rec()
4389
4390            rec()
4391        """)
4392        rc, out, err = assert_python_failure("-c", code)
4393        err = err.decode()
4394        self.assertNotIn("Cannot recover from stack overflow.", err)
4395        self.assertEqual(rc, 1)
4396
4397
4398class LogRecordTest(BaseTest):
4399    def test_str_rep(self):
4400        r = logging.makeLogRecord({})
4401        s = str(r)
4402        self.assertTrue(s.startswith('<LogRecord: '))
4403        self.assertTrue(s.endswith('>'))
4404
4405    def test_dict_arg(self):
4406        h = RecordingHandler()
4407        r = logging.getLogger()
4408        r.addHandler(h)
4409        d = {'less' : 'more' }
4410        logging.warning('less is %(less)s', d)
4411        self.assertIs(h.records[0].args, d)
4412        self.assertEqual(h.records[0].message, 'less is more')
4413        r.removeHandler(h)
4414        h.close()
4415
4416    @staticmethod # pickled as target of child process in the following test
4417    def _extract_logrecord_process_name(key, logMultiprocessing, conn=None):
4418        prev_logMultiprocessing = logging.logMultiprocessing
4419        logging.logMultiprocessing = logMultiprocessing
4420        try:
4421            import multiprocessing as mp
4422            name = mp.current_process().name
4423
4424            r1 = logging.makeLogRecord({'msg': f'msg1_{key}'})
4425
4426            # https://bugs.python.org/issue45128
4427            with support.swap_item(sys.modules, 'multiprocessing', None):
4428                r2 = logging.makeLogRecord({'msg': f'msg2_{key}'})
4429
4430            results = {'processName'  : name,
4431                       'r1.processName': r1.processName,
4432                       'r2.processName': r2.processName,
4433                      }
4434        finally:
4435            logging.logMultiprocessing = prev_logMultiprocessing
4436        if conn:
4437            conn.send(results)
4438        else:
4439            return results
4440
4441    def test_multiprocessing(self):
4442        multiprocessing_imported = 'multiprocessing' in sys.modules
4443        try:
4444            # logMultiprocessing is True by default
4445            self.assertEqual(logging.logMultiprocessing, True)
4446
4447            LOG_MULTI_PROCESSING = True
4448            # When logMultiprocessing == True:
4449            # In the main process processName = 'MainProcess'
4450            r = logging.makeLogRecord({})
4451            self.assertEqual(r.processName, 'MainProcess')
4452
4453            results = self._extract_logrecord_process_name(1, LOG_MULTI_PROCESSING)
4454            self.assertEqual('MainProcess', results['processName'])
4455            self.assertEqual('MainProcess', results['r1.processName'])
4456            self.assertEqual('MainProcess', results['r2.processName'])
4457
4458            # In other processes, processName is correct when multiprocessing in imported,
4459            # but it is (incorrectly) defaulted to 'MainProcess' otherwise (bpo-38762).
4460            import multiprocessing
4461            parent_conn, child_conn = multiprocessing.Pipe()
4462            p = multiprocessing.Process(
4463                target=self._extract_logrecord_process_name,
4464                args=(2, LOG_MULTI_PROCESSING, child_conn,)
4465            )
4466            p.start()
4467            results = parent_conn.recv()
4468            self.assertNotEqual('MainProcess', results['processName'])
4469            self.assertEqual(results['processName'], results['r1.processName'])
4470            self.assertEqual('MainProcess', results['r2.processName'])
4471            p.join()
4472
4473        finally:
4474            if multiprocessing_imported:
4475                import multiprocessing
4476
4477    def test_optional(self):
4478        r = logging.makeLogRecord({})
4479        NOT_NONE = self.assertIsNotNone
4480        NOT_NONE(r.thread)
4481        NOT_NONE(r.threadName)
4482        NOT_NONE(r.process)
4483        NOT_NONE(r.processName)
4484        log_threads = logging.logThreads
4485        log_processes = logging.logProcesses
4486        log_multiprocessing = logging.logMultiprocessing
4487        try:
4488            logging.logThreads = False
4489            logging.logProcesses = False
4490            logging.logMultiprocessing = False
4491            r = logging.makeLogRecord({})
4492            NONE = self.assertIsNone
4493            NONE(r.thread)
4494            NONE(r.threadName)
4495            NONE(r.process)
4496            NONE(r.processName)
4497        finally:
4498            logging.logThreads = log_threads
4499            logging.logProcesses = log_processes
4500            logging.logMultiprocessing = log_multiprocessing
4501
4502class BasicConfigTest(unittest.TestCase):
4503
4504    """Test suite for logging.basicConfig."""
4505
4506    def setUp(self):
4507        super(BasicConfigTest, self).setUp()
4508        self.handlers = logging.root.handlers
4509        self.saved_handlers = logging._handlers.copy()
4510        self.saved_handler_list = logging._handlerList[:]
4511        self.original_logging_level = logging.root.level
4512        self.addCleanup(self.cleanup)
4513        logging.root.handlers = []
4514
4515    def tearDown(self):
4516        for h in logging.root.handlers[:]:
4517            logging.root.removeHandler(h)
4518            h.close()
4519        super(BasicConfigTest, self).tearDown()
4520
4521    def cleanup(self):
4522        setattr(logging.root, 'handlers', self.handlers)
4523        logging._handlers.clear()
4524        logging._handlers.update(self.saved_handlers)
4525        logging._handlerList[:] = self.saved_handler_list
4526        logging.root.setLevel(self.original_logging_level)
4527
4528    def test_no_kwargs(self):
4529        logging.basicConfig()
4530
4531        # handler defaults to a StreamHandler to sys.stderr
4532        self.assertEqual(len(logging.root.handlers), 1)
4533        handler = logging.root.handlers[0]
4534        self.assertIsInstance(handler, logging.StreamHandler)
4535        self.assertEqual(handler.stream, sys.stderr)
4536
4537        formatter = handler.formatter
4538        # format defaults to logging.BASIC_FORMAT
4539        self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
4540        # datefmt defaults to None
4541        self.assertIsNone(formatter.datefmt)
4542        # style defaults to %
4543        self.assertIsInstance(formatter._style, logging.PercentStyle)
4544
4545        # level is not explicitly set
4546        self.assertEqual(logging.root.level, self.original_logging_level)
4547
4548    def test_strformatstyle(self):
4549        with support.captured_stdout() as output:
4550            logging.basicConfig(stream=sys.stdout, style="{")
4551            logging.error("Log an error")
4552            sys.stdout.seek(0)
4553            self.assertEqual(output.getvalue().strip(),
4554                "ERROR:root:Log an error")
4555
4556    def test_stringtemplatestyle(self):
4557        with support.captured_stdout() as output:
4558            logging.basicConfig(stream=sys.stdout, style="$")
4559            logging.error("Log an error")
4560            sys.stdout.seek(0)
4561            self.assertEqual(output.getvalue().strip(),
4562                "ERROR:root:Log an error")
4563
4564    def test_filename(self):
4565
4566        def cleanup(h1, h2, fn):
4567            h1.close()
4568            h2.close()
4569            os.remove(fn)
4570
4571        logging.basicConfig(filename='test.log', encoding='utf-8')
4572
4573        self.assertEqual(len(logging.root.handlers), 1)
4574        handler = logging.root.handlers[0]
4575        self.assertIsInstance(handler, logging.FileHandler)
4576
4577        expected = logging.FileHandler('test.log', 'a', encoding='utf-8')
4578        self.assertEqual(handler.stream.mode, expected.stream.mode)
4579        self.assertEqual(handler.stream.name, expected.stream.name)
4580        self.addCleanup(cleanup, handler, expected, 'test.log')
4581
4582    def test_filemode(self):
4583
4584        def cleanup(h1, h2, fn):
4585            h1.close()
4586            h2.close()
4587            os.remove(fn)
4588
4589        logging.basicConfig(filename='test.log', filemode='wb')
4590
4591        handler = logging.root.handlers[0]
4592        expected = logging.FileHandler('test.log', 'wb')
4593        self.assertEqual(handler.stream.mode, expected.stream.mode)
4594        self.addCleanup(cleanup, handler, expected, 'test.log')
4595
4596    def test_stream(self):
4597        stream = io.StringIO()
4598        self.addCleanup(stream.close)
4599        logging.basicConfig(stream=stream)
4600
4601        self.assertEqual(len(logging.root.handlers), 1)
4602        handler = logging.root.handlers[0]
4603        self.assertIsInstance(handler, logging.StreamHandler)
4604        self.assertEqual(handler.stream, stream)
4605
4606    def test_format(self):
4607        logging.basicConfig(format='%(asctime)s - %(message)s')
4608
4609        formatter = logging.root.handlers[0].formatter
4610        self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s')
4611
4612    def test_datefmt(self):
4613        logging.basicConfig(datefmt='bar')
4614
4615        formatter = logging.root.handlers[0].formatter
4616        self.assertEqual(formatter.datefmt, 'bar')
4617
4618    def test_style(self):
4619        logging.basicConfig(style='$')
4620
4621        formatter = logging.root.handlers[0].formatter
4622        self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
4623
4624    def test_level(self):
4625        old_level = logging.root.level
4626        self.addCleanup(logging.root.setLevel, old_level)
4627
4628        logging.basicConfig(level=57)
4629        self.assertEqual(logging.root.level, 57)
4630        # Test that second call has no effect
4631        logging.basicConfig(level=58)
4632        self.assertEqual(logging.root.level, 57)
4633
4634    def test_incompatible(self):
4635        assertRaises = self.assertRaises
4636        handlers = [logging.StreamHandler()]
4637        stream = sys.stderr
4638        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4639                                                      stream=stream)
4640        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4641                                                      handlers=handlers)
4642        assertRaises(ValueError, logging.basicConfig, stream=stream,
4643                                                      handlers=handlers)
4644        # Issue 23207: test for invalid kwargs
4645        assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
4646        # Should pop both filename and filemode even if filename is None
4647        logging.basicConfig(filename=None, filemode='a')
4648
4649    def test_handlers(self):
4650        handlers = [
4651            logging.StreamHandler(),
4652            logging.StreamHandler(sys.stdout),
4653            logging.StreamHandler(),
4654        ]
4655        f = logging.Formatter()
4656        handlers[2].setFormatter(f)
4657        logging.basicConfig(handlers=handlers)
4658        self.assertIs(handlers[0], logging.root.handlers[0])
4659        self.assertIs(handlers[1], logging.root.handlers[1])
4660        self.assertIs(handlers[2], logging.root.handlers[2])
4661        self.assertIsNotNone(handlers[0].formatter)
4662        self.assertIsNotNone(handlers[1].formatter)
4663        self.assertIs(handlers[2].formatter, f)
4664        self.assertIs(handlers[0].formatter, handlers[1].formatter)
4665
4666    def test_force(self):
4667        old_string_io = io.StringIO()
4668        new_string_io = io.StringIO()
4669        old_handlers = [logging.StreamHandler(old_string_io)]
4670        new_handlers = [logging.StreamHandler(new_string_io)]
4671        logging.basicConfig(level=logging.WARNING, handlers=old_handlers)
4672        logging.warning('warn')
4673        logging.info('info')
4674        logging.debug('debug')
4675        self.assertEqual(len(logging.root.handlers), 1)
4676        logging.basicConfig(level=logging.INFO, handlers=new_handlers,
4677                            force=True)
4678        logging.warning('warn')
4679        logging.info('info')
4680        logging.debug('debug')
4681        self.assertEqual(len(logging.root.handlers), 1)
4682        self.assertEqual(old_string_io.getvalue().strip(),
4683                         'WARNING:root:warn')
4684        self.assertEqual(new_string_io.getvalue().strip(),
4685                         'WARNING:root:warn\nINFO:root:info')
4686
4687    def test_encoding(self):
4688        try:
4689            encoding = 'utf-8'
4690            logging.basicConfig(filename='test.log', encoding=encoding,
4691                                errors='strict',
4692                                format='%(message)s', level=logging.DEBUG)
4693
4694            self.assertEqual(len(logging.root.handlers), 1)
4695            handler = logging.root.handlers[0]
4696            self.assertIsInstance(handler, logging.FileHandler)
4697            self.assertEqual(handler.encoding, encoding)
4698            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
4699        finally:
4700            handler.close()
4701            with open('test.log', encoding='utf-8') as f:
4702                data = f.read().strip()
4703            os.remove('test.log')
4704            self.assertEqual(data,
4705                             'The Øresund Bridge joins Copenhagen to Malmö')
4706
4707    def test_encoding_errors(self):
4708        try:
4709            encoding = 'ascii'
4710            logging.basicConfig(filename='test.log', encoding=encoding,
4711                                errors='ignore',
4712                                format='%(message)s', level=logging.DEBUG)
4713
4714            self.assertEqual(len(logging.root.handlers), 1)
4715            handler = logging.root.handlers[0]
4716            self.assertIsInstance(handler, logging.FileHandler)
4717            self.assertEqual(handler.encoding, encoding)
4718            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
4719        finally:
4720            handler.close()
4721            with open('test.log', encoding='utf-8') as f:
4722                data = f.read().strip()
4723            os.remove('test.log')
4724            self.assertEqual(data, 'The resund Bridge joins Copenhagen to Malm')
4725
4726    def test_encoding_errors_default(self):
4727        try:
4728            encoding = 'ascii'
4729            logging.basicConfig(filename='test.log', encoding=encoding,
4730                                format='%(message)s', level=logging.DEBUG)
4731
4732            self.assertEqual(len(logging.root.handlers), 1)
4733            handler = logging.root.handlers[0]
4734            self.assertIsInstance(handler, logging.FileHandler)
4735            self.assertEqual(handler.encoding, encoding)
4736            self.assertEqual(handler.errors, 'backslashreplace')
4737            logging.debug('��: ☃️: The Øresund Bridge joins Copenhagen to Malmö')
4738        finally:
4739            handler.close()
4740            with open('test.log', encoding='utf-8') as f:
4741                data = f.read().strip()
4742            os.remove('test.log')
4743            self.assertEqual(data, r'\U0001f602: \u2603\ufe0f: The \xd8resund '
4744                                   r'Bridge joins Copenhagen to Malm\xf6')
4745
4746    def test_encoding_errors_none(self):
4747        # Specifying None should behave as 'strict'
4748        try:
4749            encoding = 'ascii'
4750            logging.basicConfig(filename='test.log', encoding=encoding,
4751                                errors=None,
4752                                format='%(message)s', level=logging.DEBUG)
4753
4754            self.assertEqual(len(logging.root.handlers), 1)
4755            handler = logging.root.handlers[0]
4756            self.assertIsInstance(handler, logging.FileHandler)
4757            self.assertEqual(handler.encoding, encoding)
4758            self.assertIsNone(handler.errors)
4759
4760            message = []
4761
4762            def dummy_handle_error(record):
4763                _, v, _ = sys.exc_info()
4764                message.append(str(v))
4765
4766            handler.handleError = dummy_handle_error
4767            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
4768            self.assertTrue(message)
4769            self.assertIn("'ascii' codec can't encode "
4770                          "character '\\xd8' in position 4:", message[0])
4771        finally:
4772            handler.close()
4773            with open('test.log', encoding='utf-8') as f:
4774                data = f.read().strip()
4775            os.remove('test.log')
4776            # didn't write anything due to the encoding error
4777            self.assertEqual(data, r'')
4778
4779
4780    def _test_log(self, method, level=None):
4781        # logging.root has no handlers so basicConfig should be called
4782        called = []
4783
4784        old_basic_config = logging.basicConfig
4785        def my_basic_config(*a, **kw):
4786            old_basic_config()
4787            old_level = logging.root.level
4788            logging.root.setLevel(100)  # avoid having messages in stderr
4789            self.addCleanup(logging.root.setLevel, old_level)
4790            called.append((a, kw))
4791
4792        support.patch(self, logging, 'basicConfig', my_basic_config)
4793
4794        log_method = getattr(logging, method)
4795        if level is not None:
4796            log_method(level, "test me")
4797        else:
4798            log_method("test me")
4799
4800        # basicConfig was called with no arguments
4801        self.assertEqual(called, [((), {})])
4802
4803    def test_log(self):
4804        self._test_log('log', logging.WARNING)
4805
4806    def test_debug(self):
4807        self._test_log('debug')
4808
4809    def test_info(self):
4810        self._test_log('info')
4811
4812    def test_warning(self):
4813        self._test_log('warning')
4814
4815    def test_error(self):
4816        self._test_log('error')
4817
4818    def test_critical(self):
4819        self._test_log('critical')
4820
4821
4822class LoggerAdapterTest(unittest.TestCase):
4823    def setUp(self):
4824        super(LoggerAdapterTest, self).setUp()
4825        old_handler_list = logging._handlerList[:]
4826
4827        self.recording = RecordingHandler()
4828        self.logger = logging.root
4829        self.logger.addHandler(self.recording)
4830        self.addCleanup(self.logger.removeHandler, self.recording)
4831        self.addCleanup(self.recording.close)
4832
4833        def cleanup():
4834            logging._handlerList[:] = old_handler_list
4835
4836        self.addCleanup(cleanup)
4837        self.addCleanup(logging.shutdown)
4838        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
4839
4840    def test_exception(self):
4841        msg = 'testing exception: %r'
4842        exc = None
4843        try:
4844            1 / 0
4845        except ZeroDivisionError as e:
4846            exc = e
4847            self.adapter.exception(msg, self.recording)
4848
4849        self.assertEqual(len(self.recording.records), 1)
4850        record = self.recording.records[0]
4851        self.assertEqual(record.levelno, logging.ERROR)
4852        self.assertEqual(record.msg, msg)
4853        self.assertEqual(record.args, (self.recording,))
4854        self.assertEqual(record.exc_info,
4855                         (exc.__class__, exc, exc.__traceback__))
4856
4857    def test_exception_excinfo(self):
4858        try:
4859            1 / 0
4860        except ZeroDivisionError as e:
4861            exc = e
4862
4863        self.adapter.exception('exc_info test', exc_info=exc)
4864
4865        self.assertEqual(len(self.recording.records), 1)
4866        record = self.recording.records[0]
4867        self.assertEqual(record.exc_info,
4868                         (exc.__class__, exc, exc.__traceback__))
4869
4870    def test_critical(self):
4871        msg = 'critical test! %r'
4872        self.adapter.critical(msg, self.recording)
4873
4874        self.assertEqual(len(self.recording.records), 1)
4875        record = self.recording.records[0]
4876        self.assertEqual(record.levelno, logging.CRITICAL)
4877        self.assertEqual(record.msg, msg)
4878        self.assertEqual(record.args, (self.recording,))
4879
4880    def test_is_enabled_for(self):
4881        old_disable = self.adapter.logger.manager.disable
4882        self.adapter.logger.manager.disable = 33
4883        self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
4884                        old_disable)
4885        self.assertFalse(self.adapter.isEnabledFor(32))
4886
4887    def test_has_handlers(self):
4888        self.assertTrue(self.adapter.hasHandlers())
4889
4890        for handler in self.logger.handlers:
4891            self.logger.removeHandler(handler)
4892
4893        self.assertFalse(self.logger.hasHandlers())
4894        self.assertFalse(self.adapter.hasHandlers())
4895
4896    def test_nested(self):
4897        class Adapter(logging.LoggerAdapter):
4898            prefix = 'Adapter'
4899
4900            def process(self, msg, kwargs):
4901                return f"{self.prefix} {msg}", kwargs
4902
4903        msg = 'Adapters can be nested, yo.'
4904        adapter = Adapter(logger=self.logger, extra=None)
4905        adapter_adapter = Adapter(logger=adapter, extra=None)
4906        adapter_adapter.prefix = 'AdapterAdapter'
4907        self.assertEqual(repr(adapter), repr(adapter_adapter))
4908        adapter_adapter.log(logging.CRITICAL, msg, self.recording)
4909        self.assertEqual(len(self.recording.records), 1)
4910        record = self.recording.records[0]
4911        self.assertEqual(record.levelno, logging.CRITICAL)
4912        self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}")
4913        self.assertEqual(record.args, (self.recording,))
4914        orig_manager = adapter_adapter.manager
4915        self.assertIs(adapter.manager, orig_manager)
4916        self.assertIs(self.logger.manager, orig_manager)
4917        temp_manager = object()
4918        try:
4919            adapter_adapter.manager = temp_manager
4920            self.assertIs(adapter_adapter.manager, temp_manager)
4921            self.assertIs(adapter.manager, temp_manager)
4922            self.assertIs(self.logger.manager, temp_manager)
4923        finally:
4924            adapter_adapter.manager = orig_manager
4925        self.assertIs(adapter_adapter.manager, orig_manager)
4926        self.assertIs(adapter.manager, orig_manager)
4927        self.assertIs(self.logger.manager, orig_manager)
4928
4929
4930class LoggerTest(BaseTest, AssertErrorMessage):
4931
4932    def setUp(self):
4933        super(LoggerTest, self).setUp()
4934        self.recording = RecordingHandler()
4935        self.logger = logging.Logger(name='blah')
4936        self.logger.addHandler(self.recording)
4937        self.addCleanup(self.logger.removeHandler, self.recording)
4938        self.addCleanup(self.recording.close)
4939        self.addCleanup(logging.shutdown)
4940
4941    def test_set_invalid_level(self):
4942        self.assert_error_message(
4943            TypeError, 'Level not an integer or a valid string: None',
4944            self.logger.setLevel, None)
4945        self.assert_error_message(
4946            TypeError, 'Level not an integer or a valid string: (0, 0)',
4947            self.logger.setLevel, (0, 0))
4948
4949    def test_exception(self):
4950        msg = 'testing exception: %r'
4951        exc = None
4952        try:
4953            1 / 0
4954        except ZeroDivisionError as e:
4955            exc = e
4956            self.logger.exception(msg, self.recording)
4957
4958        self.assertEqual(len(self.recording.records), 1)
4959        record = self.recording.records[0]
4960        self.assertEqual(record.levelno, logging.ERROR)
4961        self.assertEqual(record.msg, msg)
4962        self.assertEqual(record.args, (self.recording,))
4963        self.assertEqual(record.exc_info,
4964                         (exc.__class__, exc, exc.__traceback__))
4965
4966    def test_log_invalid_level_with_raise(self):
4967        with support.swap_attr(logging, 'raiseExceptions', True):
4968            self.assertRaises(TypeError, self.logger.log, '10', 'test message')
4969
4970    def test_log_invalid_level_no_raise(self):
4971        with support.swap_attr(logging, 'raiseExceptions', False):
4972            self.logger.log('10', 'test message')  # no exception happens
4973
4974    def test_find_caller_with_stack_info(self):
4975        called = []
4976        support.patch(self, logging.traceback, 'print_stack',
4977                      lambda f, file: called.append(file.getvalue()))
4978
4979        self.logger.findCaller(stack_info=True)
4980
4981        self.assertEqual(len(called), 1)
4982        self.assertEqual('Stack (most recent call last):\n', called[0])
4983
4984    def test_find_caller_with_stacklevel(self):
4985        the_level = 1
4986
4987        def innermost():
4988            self.logger.warning('test', stacklevel=the_level)
4989
4990        def inner():
4991            innermost()
4992
4993        def outer():
4994            inner()
4995
4996        records = self.recording.records
4997        outer()
4998        self.assertEqual(records[-1].funcName, 'innermost')
4999        lineno = records[-1].lineno
5000        the_level += 1
5001        outer()
5002        self.assertEqual(records[-1].funcName, 'inner')
5003        self.assertGreater(records[-1].lineno, lineno)
5004        lineno = records[-1].lineno
5005        the_level += 1
5006        outer()
5007        self.assertEqual(records[-1].funcName, 'outer')
5008        self.assertGreater(records[-1].lineno, lineno)
5009        lineno = records[-1].lineno
5010        the_level += 1
5011        outer()
5012        self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel')
5013        self.assertGreater(records[-1].lineno, lineno)
5014
5015    def test_make_record_with_extra_overwrite(self):
5016        name = 'my record'
5017        level = 13
5018        fn = lno = msg = args = exc_info = func = sinfo = None
5019        rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
5020                                       exc_info, func, sinfo)
5021
5022        for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
5023            extra = {key: 'some value'}
5024            self.assertRaises(KeyError, self.logger.makeRecord, name, level,
5025                              fn, lno, msg, args, exc_info,
5026                              extra=extra, sinfo=sinfo)
5027
5028    def test_make_record_with_extra_no_overwrite(self):
5029        name = 'my record'
5030        level = 13
5031        fn = lno = msg = args = exc_info = func = sinfo = None
5032        extra = {'valid_key': 'some value'}
5033        result = self.logger.makeRecord(name, level, fn, lno, msg, args,
5034                                        exc_info, extra=extra, sinfo=sinfo)
5035        self.assertIn('valid_key', result.__dict__)
5036
5037    def test_has_handlers(self):
5038        self.assertTrue(self.logger.hasHandlers())
5039
5040        for handler in self.logger.handlers:
5041            self.logger.removeHandler(handler)
5042        self.assertFalse(self.logger.hasHandlers())
5043
5044    def test_has_handlers_no_propagate(self):
5045        child_logger = logging.getLogger('blah.child')
5046        child_logger.propagate = False
5047        self.assertFalse(child_logger.hasHandlers())
5048
5049    def test_is_enabled_for(self):
5050        old_disable = self.logger.manager.disable
5051        self.logger.manager.disable = 23
5052        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
5053        self.assertFalse(self.logger.isEnabledFor(22))
5054
5055    def test_is_enabled_for_disabled_logger(self):
5056        old_disabled = self.logger.disabled
5057        old_disable = self.logger.manager.disable
5058
5059        self.logger.disabled = True
5060        self.logger.manager.disable = 21
5061
5062        self.addCleanup(setattr, self.logger, 'disabled', old_disabled)
5063        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
5064
5065        self.assertFalse(self.logger.isEnabledFor(22))
5066
5067    def test_root_logger_aliases(self):
5068        root = logging.getLogger()
5069        self.assertIs(root, logging.root)
5070        self.assertIs(root, logging.getLogger(None))
5071        self.assertIs(root, logging.getLogger(''))
5072        self.assertIs(root, logging.getLogger('root'))
5073        self.assertIs(root, logging.getLogger('foo').root)
5074        self.assertIs(root, logging.getLogger('foo.bar').root)
5075        self.assertIs(root, logging.getLogger('foo').parent)
5076
5077        self.assertIsNot(root, logging.getLogger('\0'))
5078        self.assertIsNot(root, logging.getLogger('foo.bar').parent)
5079
5080    def test_invalid_names(self):
5081        self.assertRaises(TypeError, logging.getLogger, any)
5082        self.assertRaises(TypeError, logging.getLogger, b'foo')
5083
5084    def test_pickling(self):
5085        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
5086            for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'):
5087                logger = logging.getLogger(name)
5088                s = pickle.dumps(logger, proto)
5089                unpickled = pickle.loads(s)
5090                self.assertIs(unpickled, logger)
5091
5092    def test_caching(self):
5093        root = self.root_logger
5094        logger1 = logging.getLogger("abc")
5095        logger2 = logging.getLogger("abc.def")
5096
5097        # Set root logger level and ensure cache is empty
5098        root.setLevel(logging.ERROR)
5099        self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR)
5100        self.assertEqual(logger2._cache, {})
5101
5102        # Ensure cache is populated and calls are consistent
5103        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
5104        self.assertFalse(logger2.isEnabledFor(logging.DEBUG))
5105        self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False})
5106        self.assertEqual(root._cache, {})
5107        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
5108
5109        # Ensure root cache gets populated
5110        self.assertEqual(root._cache, {})
5111        self.assertTrue(root.isEnabledFor(logging.ERROR))
5112        self.assertEqual(root._cache, {logging.ERROR: True})
5113
5114        # Set parent logger level and ensure caches are emptied
5115        logger1.setLevel(logging.CRITICAL)
5116        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
5117        self.assertEqual(logger2._cache, {})
5118
5119        # Ensure logger2 uses parent logger's effective level
5120        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
5121
5122        # Set level to NOTSET and ensure caches are empty
5123        logger2.setLevel(logging.NOTSET)
5124        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
5125        self.assertEqual(logger2._cache, {})
5126        self.assertEqual(logger1._cache, {})
5127        self.assertEqual(root._cache, {})
5128
5129        # Verify logger2 follows parent and not root
5130        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
5131        self.assertTrue(logger2.isEnabledFor(logging.CRITICAL))
5132        self.assertFalse(logger1.isEnabledFor(logging.ERROR))
5133        self.assertTrue(logger1.isEnabledFor(logging.CRITICAL))
5134        self.assertTrue(root.isEnabledFor(logging.ERROR))
5135
5136        # Disable logging in manager and ensure caches are clear
5137        logging.disable()
5138        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
5139        self.assertEqual(logger2._cache, {})
5140        self.assertEqual(logger1._cache, {})
5141        self.assertEqual(root._cache, {})
5142
5143        # Ensure no loggers are enabled
5144        self.assertFalse(logger1.isEnabledFor(logging.CRITICAL))
5145        self.assertFalse(logger2.isEnabledFor(logging.CRITICAL))
5146        self.assertFalse(root.isEnabledFor(logging.CRITICAL))
5147
5148
5149class BaseFileTest(BaseTest):
5150    "Base class for handler tests that write log files"
5151
5152    def setUp(self):
5153        BaseTest.setUp(self)
5154        fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
5155        os.close(fd)
5156        self.rmfiles = []
5157
5158    def tearDown(self):
5159        for fn in self.rmfiles:
5160            os.unlink(fn)
5161        if os.path.exists(self.fn):
5162            os.unlink(self.fn)
5163        BaseTest.tearDown(self)
5164
5165    def assertLogFile(self, filename):
5166        "Assert a log file is there and register it for deletion"
5167        self.assertTrue(os.path.exists(filename),
5168                        msg="Log file %r does not exist" % filename)
5169        self.rmfiles.append(filename)
5170
5171    def next_rec(self):
5172        return logging.LogRecord('n', logging.DEBUG, 'p', 1,
5173                                 self.next_message(), None, None, None)
5174
5175class FileHandlerTest(BaseFileTest):
5176    def test_delay(self):
5177        os.unlink(self.fn)
5178        fh = logging.FileHandler(self.fn, encoding='utf-8', delay=True)
5179        self.assertIsNone(fh.stream)
5180        self.assertFalse(os.path.exists(self.fn))
5181        fh.handle(logging.makeLogRecord({}))
5182        self.assertIsNotNone(fh.stream)
5183        self.assertTrue(os.path.exists(self.fn))
5184        fh.close()
5185
5186    def test_emit_after_closing_in_write_mode(self):
5187        # Issue #42378
5188        os.unlink(self.fn)
5189        fh = logging.FileHandler(self.fn, encoding='utf-8', mode='w')
5190        fh.setFormatter(logging.Formatter('%(message)s'))
5191        fh.emit(self.next_rec())    # '1'
5192        fh.close()
5193        fh.emit(self.next_rec())    # '2'
5194        with open(self.fn) as fp:
5195            self.assertEqual(fp.read().strip(), '1')
5196
5197class RotatingFileHandlerTest(BaseFileTest):
5198    def test_should_not_rollover(self):
5199        # If maxbytes is zero rollover never occurs
5200        rh = logging.handlers.RotatingFileHandler(
5201                self.fn, encoding="utf-8", maxBytes=0)
5202        self.assertFalse(rh.shouldRollover(None))
5203        rh.close()
5204        # bpo-45401 - test with special file
5205        # We set maxBytes to 1 so that rollover would normally happen, except
5206        # for the check for regular files
5207        rh = logging.handlers.RotatingFileHandler(
5208                os.devnull, encoding="utf-8", maxBytes=1)
5209        self.assertFalse(rh.shouldRollover(self.next_rec()))
5210        rh.close()
5211
5212    def test_should_rollover(self):
5213        rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8", maxBytes=1)
5214        self.assertTrue(rh.shouldRollover(self.next_rec()))
5215        rh.close()
5216
5217    def test_file_created(self):
5218        # checks that the file is created and assumes it was created
5219        # by us
5220        rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8")
5221        rh.emit(self.next_rec())
5222        self.assertLogFile(self.fn)
5223        rh.close()
5224
5225    def test_rollover_filenames(self):
5226        def namer(name):
5227            return name + ".test"
5228        rh = logging.handlers.RotatingFileHandler(
5229            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
5230        rh.namer = namer
5231        rh.emit(self.next_rec())
5232        self.assertLogFile(self.fn)
5233        rh.emit(self.next_rec())
5234        self.assertLogFile(namer(self.fn + ".1"))
5235        rh.emit(self.next_rec())
5236        self.assertLogFile(namer(self.fn + ".2"))
5237        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
5238        rh.close()
5239
5240    def test_namer_rotator_inheritance(self):
5241        class HandlerWithNamerAndRotator(logging.handlers.RotatingFileHandler):
5242            def namer(self, name):
5243                return name + ".test"
5244
5245            def rotator(self, source, dest):
5246                if os.path.exists(source):
5247                    os.replace(source, dest + ".rotated")
5248
5249        rh = HandlerWithNamerAndRotator(
5250            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
5251        self.assertEqual(rh.namer(self.fn), self.fn + ".test")
5252        rh.emit(self.next_rec())
5253        self.assertLogFile(self.fn)
5254        rh.emit(self.next_rec())
5255        self.assertLogFile(rh.namer(self.fn + ".1") + ".rotated")
5256        self.assertFalse(os.path.exists(rh.namer(self.fn + ".1")))
5257        rh.close()
5258
5259    @support.requires_zlib()
5260    def test_rotator(self):
5261        def namer(name):
5262            return name + ".gz"
5263
5264        def rotator(source, dest):
5265            with open(source, "rb") as sf:
5266                data = sf.read()
5267                compressed = zlib.compress(data, 9)
5268                with open(dest, "wb") as df:
5269                    df.write(compressed)
5270            os.remove(source)
5271
5272        rh = logging.handlers.RotatingFileHandler(
5273            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
5274        rh.rotator = rotator
5275        rh.namer = namer
5276        m1 = self.next_rec()
5277        rh.emit(m1)
5278        self.assertLogFile(self.fn)
5279        m2 = self.next_rec()
5280        rh.emit(m2)
5281        fn = namer(self.fn + ".1")
5282        self.assertLogFile(fn)
5283        newline = os.linesep
5284        with open(fn, "rb") as f:
5285            compressed = f.read()
5286            data = zlib.decompress(compressed)
5287            self.assertEqual(data.decode("ascii"), m1.msg + newline)
5288        rh.emit(self.next_rec())
5289        fn = namer(self.fn + ".2")
5290        self.assertLogFile(fn)
5291        with open(fn, "rb") as f:
5292            compressed = f.read()
5293            data = zlib.decompress(compressed)
5294            self.assertEqual(data.decode("ascii"), m1.msg + newline)
5295        rh.emit(self.next_rec())
5296        fn = namer(self.fn + ".2")
5297        with open(fn, "rb") as f:
5298            compressed = f.read()
5299            data = zlib.decompress(compressed)
5300            self.assertEqual(data.decode("ascii"), m2.msg + newline)
5301        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
5302        rh.close()
5303
5304class TimedRotatingFileHandlerTest(BaseFileTest):
5305    def test_should_not_rollover(self):
5306        # See bpo-45401. Should only ever rollover regular files
5307        fh = logging.handlers.TimedRotatingFileHandler(
5308                os.devnull, 'S', encoding="utf-8", backupCount=1)
5309        time.sleep(1.1)    # a little over a second ...
5310        r = logging.makeLogRecord({'msg': 'testing - device file'})
5311        self.assertFalse(fh.shouldRollover(r))
5312        fh.close()
5313
5314    # other test methods added below
5315    def test_rollover(self):
5316        fh = logging.handlers.TimedRotatingFileHandler(
5317                self.fn, 'S', encoding="utf-8", backupCount=1)
5318        fmt = logging.Formatter('%(asctime)s %(message)s')
5319        fh.setFormatter(fmt)
5320        r1 = logging.makeLogRecord({'msg': 'testing - initial'})
5321        fh.emit(r1)
5322        self.assertLogFile(self.fn)
5323        time.sleep(1.1)    # a little over a second ...
5324        r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
5325        fh.emit(r2)
5326        fh.close()
5327        # At this point, we should have a recent rotated file which we
5328        # can test for the existence of. However, in practice, on some
5329        # machines which run really slowly, we don't know how far back
5330        # in time to go to look for the log file. So, we go back a fair
5331        # bit, and stop as soon as we see a rotated file. In theory this
5332        # could of course still fail, but the chances are lower.
5333        found = False
5334        now = datetime.datetime.now()
5335        GO_BACK = 5 * 60 # seconds
5336        for secs in range(GO_BACK):
5337            prev = now - datetime.timedelta(seconds=secs)
5338            fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
5339            found = os.path.exists(fn)
5340            if found:
5341                self.rmfiles.append(fn)
5342                break
5343        msg = 'No rotated files found, went back %d seconds' % GO_BACK
5344        if not found:
5345            # print additional diagnostics
5346            dn, fn = os.path.split(self.fn)
5347            files = [f for f in os.listdir(dn) if f.startswith(fn)]
5348            print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
5349            print('The only matching files are: %s' % files, file=sys.stderr)
5350            for f in files:
5351                print('Contents of %s:' % f)
5352                path = os.path.join(dn, f)
5353                with open(path, 'r') as tf:
5354                    print(tf.read())
5355        self.assertTrue(found, msg=msg)
5356
5357    def test_invalid(self):
5358        assertRaises = self.assertRaises
5359        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5360                     self.fn, 'X', encoding="utf-8", delay=True)
5361        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5362                     self.fn, 'W', encoding="utf-8", delay=True)
5363        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5364                     self.fn, 'W7', encoding="utf-8", delay=True)
5365
5366    def test_compute_rollover_daily_attime(self):
5367        currentTime = 0
5368        atTime = datetime.time(12, 0, 0)
5369        rh = logging.handlers.TimedRotatingFileHandler(
5370            self.fn, encoding="utf-8", when='MIDNIGHT', interval=1, backupCount=0,
5371            utc=True, atTime=atTime)
5372        try:
5373            actual = rh.computeRollover(currentTime)
5374            self.assertEqual(actual, currentTime + 12 * 60 * 60)
5375
5376            actual = rh.computeRollover(currentTime + 13 * 60 * 60)
5377            self.assertEqual(actual, currentTime + 36 * 60 * 60)
5378        finally:
5379            rh.close()
5380
5381    #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.')
5382    def test_compute_rollover_weekly_attime(self):
5383        currentTime = int(time.time())
5384        today = currentTime - currentTime % 86400
5385
5386        atTime = datetime.time(12, 0, 0)
5387
5388        wday = time.gmtime(today).tm_wday
5389        for day in range(7):
5390            rh = logging.handlers.TimedRotatingFileHandler(
5391                self.fn, encoding="utf-8", when='W%d' % day, interval=1, backupCount=0,
5392                utc=True, atTime=atTime)
5393            try:
5394                if wday > day:
5395                    # The rollover day has already passed this week, so we
5396                    # go over into next week
5397                    expected = (7 - wday + day)
5398                else:
5399                    expected = (day - wday)
5400                # At this point expected is in days from now, convert to seconds
5401                expected *= 24 * 60 * 60
5402                # Add in the rollover time
5403                expected += 12 * 60 * 60
5404                # Add in adjustment for today
5405                expected += today
5406                actual = rh.computeRollover(today)
5407                if actual != expected:
5408                    print('failed in timezone: %d' % time.timezone)
5409                    print('local vars: %s' % locals())
5410                self.assertEqual(actual, expected)
5411                if day == wday:
5412                    # goes into following week
5413                    expected += 7 * 24 * 60 * 60
5414                actual = rh.computeRollover(today + 13 * 60 * 60)
5415                if actual != expected:
5416                    print('failed in timezone: %d' % time.timezone)
5417                    print('local vars: %s' % locals())
5418                self.assertEqual(actual, expected)
5419            finally:
5420                rh.close()
5421
5422    def test_compute_files_to_delete(self):
5423        # See bpo-46063 for background
5424        wd = tempfile.mkdtemp(prefix='test_logging_')
5425        self.addCleanup(shutil.rmtree, wd)
5426        times = []
5427        dt = datetime.datetime.now()
5428        for i in range(10):
5429            times.append(dt.strftime('%Y-%m-%d_%H-%M-%S'))
5430            dt += datetime.timedelta(seconds=5)
5431        prefixes = ('a.b', 'a.b.c', 'd.e', 'd.e.f')
5432        files = []
5433        rotators = []
5434        for prefix in prefixes:
5435            p = os.path.join(wd, '%s.log' % prefix)
5436            rotator = logging.handlers.TimedRotatingFileHandler(p, when='s',
5437                                                                interval=5,
5438                                                                backupCount=7,
5439                                                                delay=True)
5440            rotators.append(rotator)
5441            if prefix.startswith('a.b'):
5442                for t in times:
5443                    files.append('%s.log.%s' % (prefix, t))
5444            else:
5445                rotator.namer = lambda name: name.replace('.log', '') + '.log'
5446                for t in times:
5447                    files.append('%s.%s.log' % (prefix, t))
5448        # Create empty files
5449        for fn in files:
5450            p = os.path.join(wd, fn)
5451            with open(p, 'wb') as f:
5452                pass
5453        # Now the checks that only the correct files are offered up for deletion
5454        for i, prefix in enumerate(prefixes):
5455            rotator = rotators[i]
5456            candidates = rotator.getFilesToDelete()
5457            self.assertEqual(len(candidates), 3)
5458            if prefix.startswith('a.b'):
5459                p = '%s.log.' % prefix
5460                for c in candidates:
5461                    d, fn = os.path.split(c)
5462                    self.assertTrue(fn.startswith(p))
5463            else:
5464                for c in candidates:
5465                    d, fn = os.path.split(c)
5466                    self.assertTrue(fn.endswith('.log'))
5467                    self.assertTrue(fn.startswith(prefix + '.') and
5468                                    fn[len(prefix) + 2].isdigit())
5469
5470
5471def secs(**kw):
5472    return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
5473
5474for when, exp in (('S', 1),
5475                  ('M', 60),
5476                  ('H', 60 * 60),
5477                  ('D', 60 * 60 * 24),
5478                  ('MIDNIGHT', 60 * 60 * 24),
5479                  # current time (epoch start) is a Thursday, W0 means Monday
5480                  ('W0', secs(days=4, hours=24)),
5481                 ):
5482    def test_compute_rollover(self, when=when, exp=exp):
5483        rh = logging.handlers.TimedRotatingFileHandler(
5484            self.fn, encoding="utf-8", when=when, interval=1, backupCount=0, utc=True)
5485        currentTime = 0.0
5486        actual = rh.computeRollover(currentTime)
5487        if exp != actual:
5488            # Failures occur on some systems for MIDNIGHT and W0.
5489            # Print detailed calculation for MIDNIGHT so we can try to see
5490            # what's going on
5491            if when == 'MIDNIGHT':
5492                try:
5493                    if rh.utc:
5494                        t = time.gmtime(currentTime)
5495                    else:
5496                        t = time.localtime(currentTime)
5497                    currentHour = t[3]
5498                    currentMinute = t[4]
5499                    currentSecond = t[5]
5500                    # r is the number of seconds left between now and midnight
5501                    r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
5502                                                       currentMinute) * 60 +
5503                            currentSecond)
5504                    result = currentTime + r
5505                    print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
5506                    print('currentHour: %s' % currentHour, file=sys.stderr)
5507                    print('currentMinute: %s' % currentMinute, file=sys.stderr)
5508                    print('currentSecond: %s' % currentSecond, file=sys.stderr)
5509                    print('r: %s' % r, file=sys.stderr)
5510                    print('result: %s' % result, file=sys.stderr)
5511                except Exception:
5512                    print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr)
5513        self.assertEqual(exp, actual)
5514        rh.close()
5515    setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
5516
5517
5518@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
5519class NTEventLogHandlerTest(BaseTest):
5520    def test_basic(self):
5521        logtype = 'Application'
5522        elh = win32evtlog.OpenEventLog(None, logtype)
5523        num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
5524
5525        try:
5526            h = logging.handlers.NTEventLogHandler('test_logging')
5527        except pywintypes.error as e:
5528            if e.winerror == 5:  # access denied
5529                raise unittest.SkipTest('Insufficient privileges to run test')
5530            raise
5531
5532        r = logging.makeLogRecord({'msg': 'Test Log Message'})
5533        h.handle(r)
5534        h.close()
5535        # Now see if the event is recorded
5536        self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
5537        flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
5538                win32evtlog.EVENTLOG_SEQUENTIAL_READ
5539        found = False
5540        GO_BACK = 100
5541        events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
5542        for e in events:
5543            if e.SourceName != 'test_logging':
5544                continue
5545            msg = win32evtlogutil.SafeFormatMessage(e, logtype)
5546            if msg != 'Test Log Message\r\n':
5547                continue
5548            found = True
5549            break
5550        msg = 'Record not found in event log, went back %d records' % GO_BACK
5551        self.assertTrue(found, msg=msg)
5552
5553
5554class MiscTestCase(unittest.TestCase):
5555    def test__all__(self):
5556        not_exported = {
5557            'logThreads', 'logMultiprocessing', 'logProcesses', 'currentframe',
5558            'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle',
5559            'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 'root',
5560            'threading'}
5561        support.check__all__(self, logging, not_exported=not_exported)
5562
5563
5564# Set the locale to the platform-dependent default.  I have no idea
5565# why the test does this, but in any case we save the current locale
5566# first and restore it at the end.
5567def setUpModule():
5568    cm = support.run_with_locale('LC_ALL', '')
5569    cm.__enter__()
5570    unittest.addModuleCleanup(cm.__exit__, None, None, None)
5571
5572
5573if __name__ == "__main__":
5574    unittest.main()
5575