• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2001-2022 by Vinay Sajip. All Rights Reserved.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose and without fee is hereby granted,
5# provided that the above copyright notice appear in all copies and that
6# both that copyright notice and this permission notice appear in
7# supporting documentation, and that the name of Vinay Sajip
8# not be used in advertising or publicity pertaining to distribution
9# of the software without specific, written prior permission.
10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
17"""Test harness for the logging module. Run all tests.
18
19Copyright (C) 2001-2022 Vinay Sajip. All Rights Reserved.
20"""
21import logging
22import logging.handlers
23import logging.config
24
25import codecs
26import configparser
27import copy
28import datetime
29import pathlib
30import pickle
31import io
32import itertools
33import gc
34import json
35import os
36import queue
37import random
38import re
39import shutil
40import socket
41import struct
42import sys
43import tempfile
44from test.support.script_helper import assert_python_ok, assert_python_failure
45from test import support
46from test.support import import_helper
47from test.support import os_helper
48from test.support import socket_helper
49from test.support import threading_helper
50from test.support import warnings_helper
51from test.support import asyncore
52from test.support import smtpd
53from test.support.logging_helper import TestHandler
54import textwrap
55import threading
56import asyncio
57import time
58import unittest
59import warnings
60import weakref
61
62from http.server import HTTPServer, BaseHTTPRequestHandler
63from unittest.mock import patch
64from urllib.parse import urlparse, parse_qs
65from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
66                          ThreadingTCPServer, StreamRequestHandler)
67
68try:
69    import win32evtlog, win32evtlogutil, pywintypes
70except ImportError:
71    win32evtlog = win32evtlogutil = pywintypes = None
72
73try:
74    import zlib
75except ImportError:
76    pass
77
78
79# gh-89363: Skip fork() test if Python is built with Address Sanitizer (ASAN)
80# to work around a libasan race condition, dead lock in pthread_create().
81skip_if_asan_fork = unittest.skipIf(
82    support.HAVE_ASAN_FORK_BUG,
83    "libasan has a pthread_create() dead lock related to thread+fork")
84skip_if_tsan_fork = unittest.skipIf(
85    support.check_sanitizer(thread=True),
86    "TSAN doesn't support threads after fork")
87
88
89class BaseTest(unittest.TestCase):
90
91    """Base class for logging tests."""
92
93    log_format = "%(name)s -> %(levelname)s: %(message)s"
94    expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
95    message_num = 0
96
97    def setUp(self):
98        """Setup the default logging stream to an internal StringIO instance,
99        so that we can examine log output as we want."""
100        self._threading_key = threading_helper.threading_setup()
101
102        logger_dict = logging.getLogger().manager.loggerDict
103        with logging._lock:
104            self.saved_handlers = logging._handlers.copy()
105            self.saved_handler_list = logging._handlerList[:]
106            self.saved_loggers = saved_loggers = logger_dict.copy()
107            self.saved_name_to_level = logging._nameToLevel.copy()
108            self.saved_level_to_name = logging._levelToName.copy()
109            self.logger_states = logger_states = {}
110            for name in saved_loggers:
111                logger_states[name] = getattr(saved_loggers[name],
112                                              'disabled', None)
113
114        # Set two unused loggers
115        self.logger1 = logging.getLogger("\xab\xd7\xbb")
116        self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
117
118        self.root_logger = logging.getLogger("")
119        self.original_logging_level = self.root_logger.getEffectiveLevel()
120
121        self.stream = io.StringIO()
122        self.root_logger.setLevel(logging.DEBUG)
123        self.root_hdlr = logging.StreamHandler(self.stream)
124        self.root_formatter = logging.Formatter(self.log_format)
125        self.root_hdlr.setFormatter(self.root_formatter)
126        if self.logger1.hasHandlers():
127            hlist = self.logger1.handlers + self.root_logger.handlers
128            raise AssertionError('Unexpected handlers: %s' % hlist)
129        if self.logger2.hasHandlers():
130            hlist = self.logger2.handlers + self.root_logger.handlers
131            raise AssertionError('Unexpected handlers: %s' % hlist)
132        self.root_logger.addHandler(self.root_hdlr)
133        self.assertTrue(self.logger1.hasHandlers())
134        self.assertTrue(self.logger2.hasHandlers())
135
136    def tearDown(self):
137        """Remove our logging stream, and restore the original logging
138        level."""
139        self.stream.close()
140        self.root_logger.removeHandler(self.root_hdlr)
141        while self.root_logger.handlers:
142            h = self.root_logger.handlers[0]
143            self.root_logger.removeHandler(h)
144            h.close()
145        self.root_logger.setLevel(self.original_logging_level)
146        with logging._lock:
147            logging._levelToName.clear()
148            logging._levelToName.update(self.saved_level_to_name)
149            logging._nameToLevel.clear()
150            logging._nameToLevel.update(self.saved_name_to_level)
151            logging._handlers.clear()
152            logging._handlers.update(self.saved_handlers)
153            logging._handlerList[:] = self.saved_handler_list
154            manager = logging.getLogger().manager
155            manager.disable = 0
156            loggerDict = manager.loggerDict
157            loggerDict.clear()
158            loggerDict.update(self.saved_loggers)
159            logger_states = self.logger_states
160            for name in self.logger_states:
161                if logger_states[name] is not None:
162                    self.saved_loggers[name].disabled = logger_states[name]
163
164        self.doCleanups()
165        threading_helper.threading_cleanup(*self._threading_key)
166
167    def assert_log_lines(self, expected_values, stream=None, pat=None):
168        """Match the collected log lines against the regular expression
169        self.expected_log_pat, and compare the extracted group values to
170        the expected_values list of tuples."""
171        stream = stream or self.stream
172        pat = re.compile(pat or self.expected_log_pat)
173        actual_lines = stream.getvalue().splitlines()
174        self.assertEqual(len(actual_lines), len(expected_values))
175        for actual, expected in zip(actual_lines, expected_values):
176            match = pat.search(actual)
177            if not match:
178                self.fail("Log line does not match expected pattern:\n" +
179                            actual)
180            self.assertEqual(tuple(match.groups()), expected)
181        s = stream.read()
182        if s:
183            self.fail("Remaining output at end of log stream:\n" + s)
184
185    def next_message(self):
186        """Generate a message consisting solely of an auto-incrementing
187        integer."""
188        self.message_num += 1
189        return "%d" % self.message_num
190
191
192class BuiltinLevelsTest(BaseTest):
193    """Test builtin levels and their inheritance."""
194
195    def test_flat(self):
196        # Logging levels in a flat logger namespace.
197        m = self.next_message
198
199        ERR = logging.getLogger("ERR")
200        ERR.setLevel(logging.ERROR)
201        INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
202        INF.setLevel(logging.INFO)
203        DEB = logging.getLogger("DEB")
204        DEB.setLevel(logging.DEBUG)
205
206        # These should log.
207        ERR.log(logging.CRITICAL, m())
208        ERR.error(m())
209
210        INF.log(logging.CRITICAL, m())
211        INF.error(m())
212        INF.warning(m())
213        INF.info(m())
214
215        DEB.log(logging.CRITICAL, m())
216        DEB.error(m())
217        DEB.warning(m())
218        DEB.info(m())
219        DEB.debug(m())
220
221        # These should not log.
222        ERR.warning(m())
223        ERR.info(m())
224        ERR.debug(m())
225
226        INF.debug(m())
227
228        self.assert_log_lines([
229            ('ERR', 'CRITICAL', '1'),
230            ('ERR', 'ERROR', '2'),
231            ('INF', 'CRITICAL', '3'),
232            ('INF', 'ERROR', '4'),
233            ('INF', 'WARNING', '5'),
234            ('INF', 'INFO', '6'),
235            ('DEB', 'CRITICAL', '7'),
236            ('DEB', 'ERROR', '8'),
237            ('DEB', 'WARNING', '9'),
238            ('DEB', 'INFO', '10'),
239            ('DEB', 'DEBUG', '11'),
240        ])
241
242    def test_nested_explicit(self):
243        # Logging levels in a nested namespace, all explicitly set.
244        m = self.next_message
245
246        INF = logging.getLogger("INF")
247        INF.setLevel(logging.INFO)
248        INF_ERR  = logging.getLogger("INF.ERR")
249        INF_ERR.setLevel(logging.ERROR)
250
251        # These should log.
252        INF_ERR.log(logging.CRITICAL, m())
253        INF_ERR.error(m())
254
255        # These should not log.
256        INF_ERR.warning(m())
257        INF_ERR.info(m())
258        INF_ERR.debug(m())
259
260        self.assert_log_lines([
261            ('INF.ERR', 'CRITICAL', '1'),
262            ('INF.ERR', 'ERROR', '2'),
263        ])
264
265    def test_nested_inherited(self):
266        # Logging levels in a nested namespace, inherited from parent loggers.
267        m = self.next_message
268
269        INF = logging.getLogger("INF")
270        INF.setLevel(logging.INFO)
271        INF_ERR  = logging.getLogger("INF.ERR")
272        INF_ERR.setLevel(logging.ERROR)
273        INF_UNDEF = logging.getLogger("INF.UNDEF")
274        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
275        UNDEF = logging.getLogger("UNDEF")
276
277        # These should log.
278        INF_UNDEF.log(logging.CRITICAL, m())
279        INF_UNDEF.error(m())
280        INF_UNDEF.warning(m())
281        INF_UNDEF.info(m())
282        INF_ERR_UNDEF.log(logging.CRITICAL, m())
283        INF_ERR_UNDEF.error(m())
284
285        # These should not log.
286        INF_UNDEF.debug(m())
287        INF_ERR_UNDEF.warning(m())
288        INF_ERR_UNDEF.info(m())
289        INF_ERR_UNDEF.debug(m())
290
291        self.assert_log_lines([
292            ('INF.UNDEF', 'CRITICAL', '1'),
293            ('INF.UNDEF', 'ERROR', '2'),
294            ('INF.UNDEF', 'WARNING', '3'),
295            ('INF.UNDEF', 'INFO', '4'),
296            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
297            ('INF.ERR.UNDEF', 'ERROR', '6'),
298        ])
299
300    def test_nested_with_virtual_parent(self):
301        # Logging levels when some parent does not exist yet.
302        m = self.next_message
303
304        INF = logging.getLogger("INF")
305        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
306        CHILD = logging.getLogger("INF.BADPARENT")
307        INF.setLevel(logging.INFO)
308
309        # These should log.
310        GRANDCHILD.log(logging.FATAL, m())
311        GRANDCHILD.info(m())
312        CHILD.log(logging.FATAL, m())
313        CHILD.info(m())
314
315        # These should not log.
316        GRANDCHILD.debug(m())
317        CHILD.debug(m())
318
319        self.assert_log_lines([
320            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
321            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
322            ('INF.BADPARENT', 'CRITICAL', '3'),
323            ('INF.BADPARENT', 'INFO', '4'),
324        ])
325
326    def test_regression_22386(self):
327        """See issue #22386 for more information."""
328        self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
329        self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
330
331    def test_issue27935(self):
332        fatal = logging.getLevelName('FATAL')
333        self.assertEqual(fatal, logging.FATAL)
334
335    def test_regression_29220(self):
336        """See issue #29220 for more information."""
337        logging.addLevelName(logging.INFO, '')
338        self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
339        self.assertEqual(logging.getLevelName(logging.INFO), '')
340        self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET')
341        self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET)
342
343class BasicFilterTest(BaseTest):
344
345    """Test the bundled Filter class."""
346
347    def test_filter(self):
348        # Only messages satisfying the specified criteria pass through the
349        #  filter.
350        filter_ = logging.Filter("spam.eggs")
351        handler = self.root_logger.handlers[0]
352        try:
353            handler.addFilter(filter_)
354            spam = logging.getLogger("spam")
355            spam_eggs = logging.getLogger("spam.eggs")
356            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
357            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
358
359            spam.info(self.next_message())
360            spam_eggs.info(self.next_message())  # Good.
361            spam_eggs_fish.info(self.next_message())  # Good.
362            spam_bakedbeans.info(self.next_message())
363
364            self.assert_log_lines([
365                ('spam.eggs', 'INFO', '2'),
366                ('spam.eggs.fish', 'INFO', '3'),
367            ])
368        finally:
369            handler.removeFilter(filter_)
370
371    def test_callable_filter(self):
372        # Only messages satisfying the specified criteria pass through the
373        #  filter.
374
375        def filterfunc(record):
376            parts = record.name.split('.')
377            prefix = '.'.join(parts[:2])
378            return prefix == 'spam.eggs'
379
380        handler = self.root_logger.handlers[0]
381        try:
382            handler.addFilter(filterfunc)
383            spam = logging.getLogger("spam")
384            spam_eggs = logging.getLogger("spam.eggs")
385            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
386            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
387
388            spam.info(self.next_message())
389            spam_eggs.info(self.next_message())  # Good.
390            spam_eggs_fish.info(self.next_message())  # Good.
391            spam_bakedbeans.info(self.next_message())
392
393            self.assert_log_lines([
394                ('spam.eggs', 'INFO', '2'),
395                ('spam.eggs.fish', 'INFO', '3'),
396            ])
397        finally:
398            handler.removeFilter(filterfunc)
399
400    def test_empty_filter(self):
401        f = logging.Filter()
402        r = logging.makeLogRecord({'name': 'spam.eggs'})
403        self.assertTrue(f.filter(r))
404
405#
406#   First, we define our levels. There can be as many as you want - the only
407#     limitations are that they should be integers, the lowest should be > 0 and
408#   larger values mean less information being logged. If you need specific
409#   level values which do not fit into these limitations, you can use a
410#   mapping dictionary to convert between your application levels and the
411#   logging system.
412#
413SILENT      = 120
414TACITURN    = 119
415TERSE       = 118
416EFFUSIVE    = 117
417SOCIABLE    = 116
418VERBOSE     = 115
419TALKATIVE   = 114
420GARRULOUS   = 113
421CHATTERBOX  = 112
422BORING      = 111
423
424LEVEL_RANGE = range(BORING, SILENT + 1)
425
426#
427#   Next, we define names for our levels. You don't need to do this - in which
428#   case the system will use "Level n" to denote the text for the level.
429#
430my_logging_levels = {
431    SILENT      : 'Silent',
432    TACITURN    : 'Taciturn',
433    TERSE       : 'Terse',
434    EFFUSIVE    : 'Effusive',
435    SOCIABLE    : 'Sociable',
436    VERBOSE     : 'Verbose',
437    TALKATIVE   : 'Talkative',
438    GARRULOUS   : 'Garrulous',
439    CHATTERBOX  : 'Chatterbox',
440    BORING      : 'Boring',
441}
442
443class GarrulousFilter(logging.Filter):
444
445    """A filter which blocks garrulous messages."""
446
447    def filter(self, record):
448        return record.levelno != GARRULOUS
449
450class VerySpecificFilter(logging.Filter):
451
452    """A filter which blocks sociable and taciturn messages."""
453
454    def filter(self, record):
455        return record.levelno not in [SOCIABLE, TACITURN]
456
457
458class CustomLevelsAndFiltersTest(BaseTest):
459
460    """Test various filtering possibilities with custom logging levels."""
461
462    # Skip the logger name group.
463    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
464
465    def setUp(self):
466        BaseTest.setUp(self)
467        for k, v in my_logging_levels.items():
468            logging.addLevelName(k, v)
469
470    def log_at_all_levels(self, logger):
471        for lvl in LEVEL_RANGE:
472            logger.log(lvl, self.next_message())
473
474    def test_handler_filter_replaces_record(self):
475        def replace_message(record: logging.LogRecord):
476            record = copy.copy(record)
477            record.msg = "new message!"
478            return record
479
480        # Set up a logging hierarchy such that "child" and it's handler
481        # (and thus `replace_message()`) always get called before
482        # propagating up to "parent".
483        # Then we can confirm that `replace_message()` was able to
484        # replace the log record without having a side effect on
485        # other loggers or handlers.
486        parent = logging.getLogger("parent")
487        child = logging.getLogger("parent.child")
488        stream_1 = io.StringIO()
489        stream_2 = io.StringIO()
490        handler_1 = logging.StreamHandler(stream_1)
491        handler_2 = logging.StreamHandler(stream_2)
492        handler_2.addFilter(replace_message)
493        parent.addHandler(handler_1)
494        child.addHandler(handler_2)
495
496        child.info("original message")
497        handler_1.flush()
498        handler_2.flush()
499        self.assertEqual(stream_1.getvalue(), "original message\n")
500        self.assertEqual(stream_2.getvalue(), "new message!\n")
501
502    def test_logging_filter_replaces_record(self):
503        records = set()
504
505        class RecordingFilter(logging.Filter):
506            def filter(self, record: logging.LogRecord):
507                records.add(id(record))
508                return copy.copy(record)
509
510        logger = logging.getLogger("logger")
511        logger.setLevel(logging.INFO)
512        logger.addFilter(RecordingFilter())
513        logger.addFilter(RecordingFilter())
514
515        logger.info("msg")
516
517        self.assertEqual(2, len(records))
518
519    def test_logger_filter(self):
520        # Filter at logger level.
521        self.root_logger.setLevel(VERBOSE)
522        # Levels >= 'Verbose' are good.
523        self.log_at_all_levels(self.root_logger)
524        self.assert_log_lines([
525            ('Verbose', '5'),
526            ('Sociable', '6'),
527            ('Effusive', '7'),
528            ('Terse', '8'),
529            ('Taciturn', '9'),
530            ('Silent', '10'),
531        ])
532
533    def test_handler_filter(self):
534        # Filter at handler level.
535        self.root_logger.handlers[0].setLevel(SOCIABLE)
536        try:
537            # Levels >= 'Sociable' are good.
538            self.log_at_all_levels(self.root_logger)
539            self.assert_log_lines([
540                ('Sociable', '6'),
541                ('Effusive', '7'),
542                ('Terse', '8'),
543                ('Taciturn', '9'),
544                ('Silent', '10'),
545            ])
546        finally:
547            self.root_logger.handlers[0].setLevel(logging.NOTSET)
548
549    def test_specific_filters(self):
550        # Set a specific filter object on the handler, and then add another
551        #  filter object on the logger itself.
552        handler = self.root_logger.handlers[0]
553        specific_filter = None
554        garr = GarrulousFilter()
555        handler.addFilter(garr)
556        try:
557            self.log_at_all_levels(self.root_logger)
558            first_lines = [
559                # Notice how 'Garrulous' is missing
560                ('Boring', '1'),
561                ('Chatterbox', '2'),
562                ('Talkative', '4'),
563                ('Verbose', '5'),
564                ('Sociable', '6'),
565                ('Effusive', '7'),
566                ('Terse', '8'),
567                ('Taciturn', '9'),
568                ('Silent', '10'),
569            ]
570            self.assert_log_lines(first_lines)
571
572            specific_filter = VerySpecificFilter()
573            self.root_logger.addFilter(specific_filter)
574            self.log_at_all_levels(self.root_logger)
575            self.assert_log_lines(first_lines + [
576                # Not only 'Garrulous' is still missing, but also 'Sociable'
577                # and 'Taciturn'
578                ('Boring', '11'),
579                ('Chatterbox', '12'),
580                ('Talkative', '14'),
581                ('Verbose', '15'),
582                ('Effusive', '17'),
583                ('Terse', '18'),
584                ('Silent', '20'),
585        ])
586        finally:
587            if specific_filter:
588                self.root_logger.removeFilter(specific_filter)
589            handler.removeFilter(garr)
590
591
592def make_temp_file(*args, **kwargs):
593    fd, fn = tempfile.mkstemp(*args, **kwargs)
594    os.close(fd)
595    return fn
596
597
598class HandlerTest(BaseTest):
599    def test_name(self):
600        h = logging.Handler()
601        h.name = 'generic'
602        self.assertEqual(h.name, 'generic')
603        h.name = 'anothergeneric'
604        self.assertEqual(h.name, 'anothergeneric')
605        self.assertRaises(NotImplementedError, h.emit, None)
606
607    def test_builtin_handlers(self):
608        # We can't actually *use* too many handlers in the tests,
609        # but we can try instantiating them with various options
610        if sys.platform in ('linux', 'android', 'darwin'):
611            for existing in (True, False):
612                fn = make_temp_file()
613                if not existing:
614                    os.unlink(fn)
615                h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=True)
616                if existing:
617                    dev, ino = h.dev, h.ino
618                    self.assertEqual(dev, -1)
619                    self.assertEqual(ino, -1)
620                    r = logging.makeLogRecord({'msg': 'Test'})
621                    h.handle(r)
622                    # Now remove the file.
623                    os.unlink(fn)
624                    self.assertFalse(os.path.exists(fn))
625                    # The next call should recreate the file.
626                    h.handle(r)
627                    self.assertTrue(os.path.exists(fn))
628                else:
629                    self.assertEqual(h.dev, -1)
630                    self.assertEqual(h.ino, -1)
631                h.close()
632                if existing:
633                    os.unlink(fn)
634            if sys.platform == 'darwin':
635                sockname = '/var/run/syslog'
636            else:
637                sockname = '/dev/log'
638            try:
639                h = logging.handlers.SysLogHandler(sockname)
640                self.assertEqual(h.facility, h.LOG_USER)
641                self.assertTrue(h.unixsocket)
642                h.close()
643            except OSError: # syslogd might not be available
644                pass
645        for method in ('GET', 'POST', 'PUT'):
646            if method == 'PUT':
647                self.assertRaises(ValueError, logging.handlers.HTTPHandler,
648                                  'localhost', '/log', method)
649            else:
650                h = logging.handlers.HTTPHandler('localhost', '/log', method)
651                h.close()
652        h = logging.handlers.BufferingHandler(0)
653        r = logging.makeLogRecord({})
654        self.assertTrue(h.shouldFlush(r))
655        h.close()
656        h = logging.handlers.BufferingHandler(1)
657        self.assertFalse(h.shouldFlush(r))
658        h.close()
659
660    def test_pathlike_objects(self):
661        """
662        Test that path-like objects are accepted as filename arguments to handlers.
663
664        See Issue #27493.
665        """
666        fn = make_temp_file()
667        os.unlink(fn)
668        pfn = os_helper.FakePath(fn)
669        cases = (
670                    (logging.FileHandler, (pfn, 'w')),
671                    (logging.handlers.RotatingFileHandler, (pfn, 'a')),
672                    (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')),
673                )
674        if sys.platform in ('linux', 'android', 'darwin'):
675            cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
676        for cls, args in cases:
677            h = cls(*args, encoding="utf-8")
678            self.assertTrue(os.path.exists(fn))
679            h.close()
680            os.unlink(fn)
681
682    @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
683    @unittest.skipIf(
684        support.is_emscripten, "Emscripten cannot fstat unlinked files."
685    )
686    @threading_helper.requires_working_threading()
687    @support.requires_resource('walltime')
688    def test_race(self):
689        # Issue #14632 refers.
690        def remove_loop(fname, tries):
691            for _ in range(tries):
692                try:
693                    os.unlink(fname)
694                    self.deletion_time = time.time()
695                except OSError:
696                    pass
697                time.sleep(0.004 * random.randint(0, 4))
698
699        del_count = 500
700        log_count = 500
701
702        self.handle_time = None
703        self.deletion_time = None
704
705        for delay in (False, True):
706            fn = make_temp_file('.log', 'test_logging-3-')
707            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
708            remover.daemon = True
709            remover.start()
710            h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=delay)
711            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
712            h.setFormatter(f)
713            try:
714                for _ in range(log_count):
715                    time.sleep(0.005)
716                    r = logging.makeLogRecord({'msg': 'testing' })
717                    try:
718                        self.handle_time = time.time()
719                        h.handle(r)
720                    except Exception:
721                        print('Deleted at %s, '
722                              'opened at %s' % (self.deletion_time,
723                                                self.handle_time))
724                        raise
725            finally:
726                remover.join()
727                h.close()
728                if os.path.exists(fn):
729                    os.unlink(fn)
730
731    # The implementation relies on os.register_at_fork existing, but we test
732    # based on os.fork existing because that is what users and this test use.
733    # This helps ensure that when fork exists (the important concept) that the
734    # register_at_fork mechanism is also present and used.
735    @support.requires_fork()
736    @threading_helper.requires_working_threading()
737    @skip_if_asan_fork
738    @skip_if_tsan_fork
739    def test_post_fork_child_no_deadlock(self):
740        """Ensure child logging locks are not held; bpo-6721 & bpo-36533."""
741        class _OurHandler(logging.Handler):
742            def __init__(self):
743                super().__init__()
744                self.sub_handler = logging.StreamHandler(
745                    stream=open('/dev/null', 'wt', encoding='utf-8'))
746
747            def emit(self, record):
748                with self.sub_handler.lock:
749                    self.sub_handler.emit(record)
750
751        self.assertEqual(len(logging._handlers), 0)
752        refed_h = _OurHandler()
753        self.addCleanup(refed_h.sub_handler.stream.close)
754        refed_h.name = 'because we need at least one for this test'
755        self.assertGreater(len(logging._handlers), 0)
756        self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1)
757        test_logger = logging.getLogger('test_post_fork_child_no_deadlock')
758        test_logger.addHandler(refed_h)
759        test_logger.setLevel(logging.DEBUG)
760
761        locks_held__ready_to_fork = threading.Event()
762        fork_happened__release_locks_and_end_thread = threading.Event()
763
764        def lock_holder_thread_fn():
765            with logging._lock, refed_h.lock:
766                # Tell the main thread to do the fork.
767                locks_held__ready_to_fork.set()
768
769                # If the deadlock bug exists, the fork will happen
770                # without dealing with the locks we hold, deadlocking
771                # the child.
772
773                # Wait for a successful fork or an unreasonable amount of
774                # time before releasing our locks.  To avoid a timing based
775                # test we'd need communication from os.fork() as to when it
776                # has actually happened.  Given this is a regression test
777                # for a fixed issue, potentially less reliably detecting
778                # regression via timing is acceptable for simplicity.
779                # The test will always take at least this long. :(
780                fork_happened__release_locks_and_end_thread.wait(0.5)
781
782        lock_holder_thread = threading.Thread(
783                target=lock_holder_thread_fn,
784                name='test_post_fork_child_no_deadlock lock holder')
785        lock_holder_thread.start()
786
787        locks_held__ready_to_fork.wait()
788        pid = os.fork()
789        if pid == 0:
790            # Child process
791            try:
792                test_logger.info(r'Child process did not deadlock. \o/')
793            finally:
794                os._exit(0)
795        else:
796            # Parent process
797            test_logger.info(r'Parent process returned from fork. \o/')
798            fork_happened__release_locks_and_end_thread.set()
799            lock_holder_thread.join()
800
801            support.wait_process(pid, exitcode=0)
802
803
804class BadStream(object):
805    def write(self, data):
806        raise RuntimeError('deliberate mistake')
807
808class TestStreamHandler(logging.StreamHandler):
809    def handleError(self, record):
810        self.error_record = record
811
812class StreamWithIntName(object):
813    level = logging.NOTSET
814    name = 2
815
816class StreamHandlerTest(BaseTest):
817    def test_error_handling(self):
818        h = TestStreamHandler(BadStream())
819        r = logging.makeLogRecord({})
820        old_raise = logging.raiseExceptions
821
822        try:
823            h.handle(r)
824            self.assertIs(h.error_record, r)
825
826            h = logging.StreamHandler(BadStream())
827            with support.captured_stderr() as stderr:
828                h.handle(r)
829                msg = '\nRuntimeError: deliberate mistake\n'
830                self.assertIn(msg, stderr.getvalue())
831
832            logging.raiseExceptions = False
833            with support.captured_stderr() as stderr:
834                h.handle(r)
835                self.assertEqual('', stderr.getvalue())
836        finally:
837            logging.raiseExceptions = old_raise
838
839    def test_stream_setting(self):
840        """
841        Test setting the handler's stream
842        """
843        h = logging.StreamHandler()
844        stream = io.StringIO()
845        old = h.setStream(stream)
846        self.assertIs(old, sys.stderr)
847        actual = h.setStream(old)
848        self.assertIs(actual, stream)
849        # test that setting to existing value returns None
850        actual = h.setStream(old)
851        self.assertIsNone(actual)
852
853    def test_can_represent_stream_with_int_name(self):
854        h = logging.StreamHandler(StreamWithIntName())
855        self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>')
856
857# -- The following section could be moved into a server_helper.py module
858# -- if it proves to be of wider utility than just test_logging
859
860class TestSMTPServer(smtpd.SMTPServer):
861    """
862    This class implements a test SMTP server.
863
864    :param addr: A (host, port) tuple which the server listens on.
865                 You can specify a port value of zero: the server's
866                 *port* attribute will hold the actual port number
867                 used, which can be used in client connections.
868    :param handler: A callable which will be called to process
869                    incoming messages. The handler will be passed
870                    the client address tuple, who the message is from,
871                    a list of recipients and the message data.
872    :param poll_interval: The interval, in seconds, used in the underlying
873                          :func:`select` or :func:`poll` call by
874                          :func:`asyncore.loop`.
875    :param sockmap: A dictionary which will be used to hold
876                    :class:`asyncore.dispatcher` instances used by
877                    :func:`asyncore.loop`. This avoids changing the
878                    :mod:`asyncore` module's global state.
879    """
880
881    def __init__(self, addr, handler, poll_interval, sockmap):
882        smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
883                                  decode_data=True)
884        self.port = self.socket.getsockname()[1]
885        self._handler = handler
886        self._thread = None
887        self._quit = False
888        self.poll_interval = poll_interval
889
890    def process_message(self, peer, mailfrom, rcpttos, data):
891        """
892        Delegates to the handler passed in to the server's constructor.
893
894        Typically, this will be a test case method.
895        :param peer: The client (host, port) tuple.
896        :param mailfrom: The address of the sender.
897        :param rcpttos: The addresses of the recipients.
898        :param data: The message.
899        """
900        self._handler(peer, mailfrom, rcpttos, data)
901
902    def start(self):
903        """
904        Start the server running on a separate daemon thread.
905        """
906        self._thread = t = threading.Thread(target=self.serve_forever,
907                                            args=(self.poll_interval,))
908        t.daemon = True
909        t.start()
910
911    def serve_forever(self, poll_interval):
912        """
913        Run the :mod:`asyncore` loop until normal termination
914        conditions arise.
915        :param poll_interval: The interval, in seconds, used in the underlying
916                              :func:`select` or :func:`poll` call by
917                              :func:`asyncore.loop`.
918        """
919        while not self._quit:
920            asyncore.loop(poll_interval, map=self._map, count=1)
921
922    def stop(self):
923        """
924        Stop the thread by closing the server instance.
925        Wait for the server thread to terminate.
926        """
927        self._quit = True
928        threading_helper.join_thread(self._thread)
929        self._thread = None
930        self.close()
931        asyncore.close_all(map=self._map, ignore_all=True)
932
933
934class ControlMixin(object):
935    """
936    This mixin is used to start a server on a separate thread, and
937    shut it down programmatically. Request handling is simplified - instead
938    of needing to derive a suitable RequestHandler subclass, you just
939    provide a callable which will be passed each received request to be
940    processed.
941
942    :param handler: A handler callable which will be called with a
943                    single parameter - the request - in order to
944                    process the request. This handler is called on the
945                    server thread, effectively meaning that requests are
946                    processed serially. While not quite web scale ;-),
947                    this should be fine for testing applications.
948    :param poll_interval: The polling interval in seconds.
949    """
950    def __init__(self, handler, poll_interval):
951        self._thread = None
952        self.poll_interval = poll_interval
953        self._handler = handler
954        self.ready = threading.Event()
955
956    def start(self):
957        """
958        Create a daemon thread to run the server, and start it.
959        """
960        self._thread = t = threading.Thread(target=self.serve_forever,
961                                            args=(self.poll_interval,))
962        t.daemon = True
963        t.start()
964
965    def serve_forever(self, poll_interval):
966        """
967        Run the server. Set the ready flag before entering the
968        service loop.
969        """
970        self.ready.set()
971        super(ControlMixin, self).serve_forever(poll_interval)
972
973    def stop(self):
974        """
975        Tell the server thread to stop, and wait for it to do so.
976        """
977        self.shutdown()
978        if self._thread is not None:
979            threading_helper.join_thread(self._thread)
980            self._thread = None
981        self.server_close()
982        self.ready.clear()
983
984class TestHTTPServer(ControlMixin, HTTPServer):
985    """
986    An HTTP server which is controllable using :class:`ControlMixin`.
987
988    :param addr: A tuple with the IP address and port to listen on.
989    :param handler: A handler callable which will be called with a
990                    single parameter - the request - in order to
991                    process the request.
992    :param poll_interval: The polling interval in seconds.
993    :param log: Pass ``True`` to enable log messages.
994    """
995    def __init__(self, addr, handler, poll_interval=0.5,
996                 log=False, sslctx=None):
997        class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
998            def __getattr__(self, name, default=None):
999                if name.startswith('do_'):
1000                    return self.process_request
1001                raise AttributeError(name)
1002
1003            def process_request(self):
1004                self.server._handler(self)
1005
1006            def log_message(self, format, *args):
1007                if log:
1008                    super(DelegatingHTTPRequestHandler,
1009                          self).log_message(format, *args)
1010        HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
1011        ControlMixin.__init__(self, handler, poll_interval)
1012        self.sslctx = sslctx
1013
1014    def get_request(self):
1015        try:
1016            sock, addr = self.socket.accept()
1017            if self.sslctx:
1018                sock = self.sslctx.wrap_socket(sock, server_side=True)
1019        except OSError as e:
1020            # socket errors are silenced by the caller, print them here
1021            sys.stderr.write("Got an error:\n%s\n" % e)
1022            raise
1023        return sock, addr
1024
1025class TestTCPServer(ControlMixin, ThreadingTCPServer):
1026    """
1027    A TCP server which is controllable using :class:`ControlMixin`.
1028
1029    :param addr: A tuple with the IP address and port to listen on.
1030    :param handler: A handler callable which will be called with a single
1031                    parameter - the request - in order to process the request.
1032    :param poll_interval: The polling interval in seconds.
1033    :bind_and_activate: If True (the default), binds the server and starts it
1034                        listening. If False, you need to call
1035                        :meth:`server_bind` and :meth:`server_activate` at
1036                        some later time before calling :meth:`start`, so that
1037                        the server will set up the socket and listen on it.
1038    """
1039
1040    allow_reuse_address = True
1041
1042    def __init__(self, addr, handler, poll_interval=0.5,
1043                 bind_and_activate=True):
1044        class DelegatingTCPRequestHandler(StreamRequestHandler):
1045
1046            def handle(self):
1047                self.server._handler(self)
1048        ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
1049                                    bind_and_activate)
1050        ControlMixin.__init__(self, handler, poll_interval)
1051
1052    def server_bind(self):
1053        super(TestTCPServer, self).server_bind()
1054        self.port = self.socket.getsockname()[1]
1055
1056class TestUDPServer(ControlMixin, ThreadingUDPServer):
1057    """
1058    A UDP server which is controllable using :class:`ControlMixin`.
1059
1060    :param addr: A tuple with the IP address and port to listen on.
1061    :param handler: A handler callable which will be called with a
1062                    single parameter - the request - in order to
1063                    process the request.
1064    :param poll_interval: The polling interval for shutdown requests,
1065                          in seconds.
1066    :bind_and_activate: If True (the default), binds the server and
1067                        starts it listening. If False, you need to
1068                        call :meth:`server_bind` and
1069                        :meth:`server_activate` at some later time
1070                        before calling :meth:`start`, so that the server will
1071                        set up the socket and listen on it.
1072    """
1073    def __init__(self, addr, handler, poll_interval=0.5,
1074                 bind_and_activate=True):
1075        class DelegatingUDPRequestHandler(DatagramRequestHandler):
1076
1077            def handle(self):
1078                self.server._handler(self)
1079
1080            def finish(self):
1081                data = self.wfile.getvalue()
1082                if data:
1083                    try:
1084                        super(DelegatingUDPRequestHandler, self).finish()
1085                    except OSError:
1086                        if not self.server._closed:
1087                            raise
1088
1089        ThreadingUDPServer.__init__(self, addr,
1090                                    DelegatingUDPRequestHandler,
1091                                    bind_and_activate)
1092        ControlMixin.__init__(self, handler, poll_interval)
1093        self._closed = False
1094
1095    def server_bind(self):
1096        super(TestUDPServer, self).server_bind()
1097        self.port = self.socket.getsockname()[1]
1098
1099    def server_close(self):
1100        super(TestUDPServer, self).server_close()
1101        self._closed = True
1102
1103if hasattr(socket, "AF_UNIX"):
1104    class TestUnixStreamServer(TestTCPServer):
1105        address_family = socket.AF_UNIX
1106
1107    class TestUnixDatagramServer(TestUDPServer):
1108        address_family = socket.AF_UNIX
1109
1110# - end of server_helper section
1111
1112@support.requires_working_socket()
1113@threading_helper.requires_working_threading()
1114class SMTPHandlerTest(BaseTest):
1115    # bpo-14314, bpo-19665, bpo-34092: don't wait forever
1116    TIMEOUT = support.LONG_TIMEOUT
1117
1118    def test_basic(self):
1119        sockmap = {}
1120        server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001,
1121                                sockmap)
1122        server.start()
1123        addr = (socket_helper.HOST, server.port)
1124        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
1125                                         timeout=self.TIMEOUT)
1126        self.assertEqual(h.toaddrs, ['you'])
1127        self.messages = []
1128        r = logging.makeLogRecord({'msg': 'Hello \u2713'})
1129        self.handled = threading.Event()
1130        h.handle(r)
1131        self.handled.wait(self.TIMEOUT)
1132        server.stop()
1133        self.assertTrue(self.handled.is_set())
1134        self.assertEqual(len(self.messages), 1)
1135        peer, mailfrom, rcpttos, data = self.messages[0]
1136        self.assertEqual(mailfrom, 'me')
1137        self.assertEqual(rcpttos, ['you'])
1138        self.assertIn('\nSubject: Log\n', data)
1139        self.assertTrue(data.endswith('\n\nHello \u2713'))
1140        h.close()
1141
1142    def process_message(self, *args):
1143        self.messages.append(args)
1144        self.handled.set()
1145
1146class MemoryHandlerTest(BaseTest):
1147
1148    """Tests for the MemoryHandler."""
1149
1150    # Do not bother with a logger name group.
1151    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
1152
1153    def setUp(self):
1154        BaseTest.setUp(self)
1155        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1156                                                       self.root_hdlr)
1157        self.mem_logger = logging.getLogger('mem')
1158        self.mem_logger.propagate = 0
1159        self.mem_logger.addHandler(self.mem_hdlr)
1160
1161    def tearDown(self):
1162        self.mem_hdlr.close()
1163        BaseTest.tearDown(self)
1164
1165    def test_flush(self):
1166        # The memory handler flushes to its target handler based on specific
1167        #  criteria (message count and message level).
1168        self.mem_logger.debug(self.next_message())
1169        self.assert_log_lines([])
1170        self.mem_logger.info(self.next_message())
1171        self.assert_log_lines([])
1172        # This will flush because the level is >= logging.WARNING
1173        self.mem_logger.warning(self.next_message())
1174        lines = [
1175            ('DEBUG', '1'),
1176            ('INFO', '2'),
1177            ('WARNING', '3'),
1178        ]
1179        self.assert_log_lines(lines)
1180        for n in (4, 14):
1181            for i in range(9):
1182                self.mem_logger.debug(self.next_message())
1183            self.assert_log_lines(lines)
1184            # This will flush because it's the 10th message since the last
1185            #  flush.
1186            self.mem_logger.debug(self.next_message())
1187            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
1188            self.assert_log_lines(lines)
1189
1190        self.mem_logger.debug(self.next_message())
1191        self.assert_log_lines(lines)
1192
1193    def test_flush_on_close(self):
1194        """
1195        Test that the flush-on-close configuration works as expected.
1196        """
1197        self.mem_logger.debug(self.next_message())
1198        self.assert_log_lines([])
1199        self.mem_logger.info(self.next_message())
1200        self.assert_log_lines([])
1201        self.mem_logger.removeHandler(self.mem_hdlr)
1202        # Default behaviour is to flush on close. Check that it happens.
1203        self.mem_hdlr.close()
1204        lines = [
1205            ('DEBUG', '1'),
1206            ('INFO', '2'),
1207        ]
1208        self.assert_log_lines(lines)
1209        # Now configure for flushing not to be done on close.
1210        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1211                                                       self.root_hdlr,
1212                                                       False)
1213        self.mem_logger.addHandler(self.mem_hdlr)
1214        self.mem_logger.debug(self.next_message())
1215        self.assert_log_lines(lines)  # no change
1216        self.mem_logger.info(self.next_message())
1217        self.assert_log_lines(lines)  # no change
1218        self.mem_logger.removeHandler(self.mem_hdlr)
1219        self.mem_hdlr.close()
1220        # assert that no new lines have been added
1221        self.assert_log_lines(lines)  # no change
1222
1223    def test_shutdown_flush_on_close(self):
1224        """
1225        Test that the flush-on-close configuration is respected by the
1226        shutdown method.
1227        """
1228        self.mem_logger.debug(self.next_message())
1229        self.assert_log_lines([])
1230        self.mem_logger.info(self.next_message())
1231        self.assert_log_lines([])
1232        # Default behaviour is to flush on close. Check that it happens.
1233        logging.shutdown(handlerList=[logging.weakref.ref(self.mem_hdlr)])
1234        lines = [
1235            ('DEBUG', '1'),
1236            ('INFO', '2'),
1237        ]
1238        self.assert_log_lines(lines)
1239        # Now configure for flushing not to be done on close.
1240        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1241                                                       self.root_hdlr,
1242                                                       False)
1243        self.mem_logger.addHandler(self.mem_hdlr)
1244        self.mem_logger.debug(self.next_message())
1245        self.assert_log_lines(lines)  # no change
1246        self.mem_logger.info(self.next_message())
1247        self.assert_log_lines(lines)  # no change
1248        # assert that no new lines have been added after shutdown
1249        logging.shutdown(handlerList=[logging.weakref.ref(self.mem_hdlr)])
1250        self.assert_log_lines(lines) # no change
1251
1252    @threading_helper.requires_working_threading()
1253    def test_race_between_set_target_and_flush(self):
1254        class MockRaceConditionHandler:
1255            def __init__(self, mem_hdlr):
1256                self.mem_hdlr = mem_hdlr
1257                self.threads = []
1258
1259            def removeTarget(self):
1260                self.mem_hdlr.setTarget(None)
1261
1262            def handle(self, msg):
1263                thread = threading.Thread(target=self.removeTarget)
1264                self.threads.append(thread)
1265                thread.start()
1266
1267        target = MockRaceConditionHandler(self.mem_hdlr)
1268        try:
1269            self.mem_hdlr.setTarget(target)
1270
1271            for _ in range(10):
1272                time.sleep(0.005)
1273                self.mem_logger.info("not flushed")
1274                self.mem_logger.warning("flushed")
1275        finally:
1276            for thread in target.threads:
1277                threading_helper.join_thread(thread)
1278
1279
1280class ExceptionFormatter(logging.Formatter):
1281    """A special exception formatter."""
1282    def formatException(self, ei):
1283        return "Got a [%s]" % ei[0].__name__
1284
1285def closeFileHandler(h, fn):
1286    h.close()
1287    os.remove(fn)
1288
1289class ConfigFileTest(BaseTest):
1290
1291    """Reading logging config from a .ini-style config file."""
1292
1293    check_no_resource_warning = warnings_helper.check_no_resource_warning
1294    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1295
1296    # config0 is a standard configuration.
1297    config0 = """
1298    [loggers]
1299    keys=root
1300
1301    [handlers]
1302    keys=hand1
1303
1304    [formatters]
1305    keys=form1
1306
1307    [logger_root]
1308    level=WARNING
1309    handlers=hand1
1310
1311    [handler_hand1]
1312    class=StreamHandler
1313    level=NOTSET
1314    formatter=form1
1315    args=(sys.stdout,)
1316
1317    [formatter_form1]
1318    format=%(levelname)s ++ %(message)s
1319    datefmt=
1320    """
1321
1322    # config1 adds a little to the standard configuration.
1323    config1 = """
1324    [loggers]
1325    keys=root,parser
1326
1327    [handlers]
1328    keys=hand1
1329
1330    [formatters]
1331    keys=form1
1332
1333    [logger_root]
1334    level=WARNING
1335    handlers=
1336
1337    [logger_parser]
1338    level=DEBUG
1339    handlers=hand1
1340    propagate=1
1341    qualname=compiler.parser
1342
1343    [handler_hand1]
1344    class=StreamHandler
1345    level=NOTSET
1346    formatter=form1
1347    args=(sys.stdout,)
1348
1349    [formatter_form1]
1350    format=%(levelname)s ++ %(message)s
1351    datefmt=
1352    """
1353
1354    # config1a moves the handler to the root.
1355    config1a = """
1356    [loggers]
1357    keys=root,parser
1358
1359    [handlers]
1360    keys=hand1
1361
1362    [formatters]
1363    keys=form1
1364
1365    [logger_root]
1366    level=WARNING
1367    handlers=hand1
1368
1369    [logger_parser]
1370    level=DEBUG
1371    handlers=
1372    propagate=1
1373    qualname=compiler.parser
1374
1375    [handler_hand1]
1376    class=StreamHandler
1377    level=NOTSET
1378    formatter=form1
1379    args=(sys.stdout,)
1380
1381    [formatter_form1]
1382    format=%(levelname)s ++ %(message)s
1383    datefmt=
1384    """
1385
1386    # config2 has a subtle configuration error that should be reported
1387    config2 = config1.replace("sys.stdout", "sys.stbout")
1388
1389    # config3 has a less subtle configuration error
1390    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
1391
1392    # config4 specifies a custom formatter class to be loaded
1393    config4 = """
1394    [loggers]
1395    keys=root
1396
1397    [handlers]
1398    keys=hand1
1399
1400    [formatters]
1401    keys=form1
1402
1403    [logger_root]
1404    level=NOTSET
1405    handlers=hand1
1406
1407    [handler_hand1]
1408    class=StreamHandler
1409    level=NOTSET
1410    formatter=form1
1411    args=(sys.stdout,)
1412
1413    [formatter_form1]
1414    class=""" + __name__ + """.ExceptionFormatter
1415    format=%(levelname)s:%(name)s:%(message)s
1416    datefmt=
1417    """
1418
1419    # config5 specifies a custom handler class to be loaded
1420    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
1421
1422    # config6 uses ', ' delimiters in the handlers and formatters sections
1423    config6 = """
1424    [loggers]
1425    keys=root,parser
1426
1427    [handlers]
1428    keys=hand1, hand2
1429
1430    [formatters]
1431    keys=form1, form2
1432
1433    [logger_root]
1434    level=WARNING
1435    handlers=
1436
1437    [logger_parser]
1438    level=DEBUG
1439    handlers=hand1
1440    propagate=1
1441    qualname=compiler.parser
1442
1443    [handler_hand1]
1444    class=StreamHandler
1445    level=NOTSET
1446    formatter=form1
1447    args=(sys.stdout,)
1448
1449    [handler_hand2]
1450    class=StreamHandler
1451    level=NOTSET
1452    formatter=form1
1453    args=(sys.stderr,)
1454
1455    [formatter_form1]
1456    format=%(levelname)s ++ %(message)s
1457    datefmt=
1458
1459    [formatter_form2]
1460    format=%(message)s
1461    datefmt=
1462    """
1463
1464    # config7 adds a compiler logger, and uses kwargs instead of args.
1465    config7 = """
1466    [loggers]
1467    keys=root,parser,compiler
1468
1469    [handlers]
1470    keys=hand1
1471
1472    [formatters]
1473    keys=form1
1474
1475    [logger_root]
1476    level=WARNING
1477    handlers=hand1
1478
1479    [logger_compiler]
1480    level=DEBUG
1481    handlers=
1482    propagate=1
1483    qualname=compiler
1484
1485    [logger_parser]
1486    level=DEBUG
1487    handlers=
1488    propagate=1
1489    qualname=compiler.parser
1490
1491    [handler_hand1]
1492    class=StreamHandler
1493    level=NOTSET
1494    formatter=form1
1495    kwargs={'stream': sys.stdout,}
1496
1497    [formatter_form1]
1498    format=%(levelname)s ++ %(message)s
1499    datefmt=
1500    """
1501
1502    # config 8, check for resource warning
1503    config8 = r"""
1504    [loggers]
1505    keys=root
1506
1507    [handlers]
1508    keys=file
1509
1510    [formatters]
1511    keys=
1512
1513    [logger_root]
1514    level=DEBUG
1515    handlers=file
1516
1517    [handler_file]
1518    class=FileHandler
1519    level=DEBUG
1520    args=("{tempfile}",)
1521    kwargs={{"encoding": "utf-8"}}
1522    """
1523
1524
1525    config9 = """
1526    [loggers]
1527    keys=root
1528
1529    [handlers]
1530    keys=hand1
1531
1532    [formatters]
1533    keys=form1
1534
1535    [logger_root]
1536    level=WARNING
1537    handlers=hand1
1538
1539    [handler_hand1]
1540    class=StreamHandler
1541    level=NOTSET
1542    formatter=form1
1543    args=(sys.stdout,)
1544
1545    [formatter_form1]
1546    format=%(message)s ++ %(customfield)s
1547    defaults={"customfield": "defaultvalue"}
1548    """
1549
1550    disable_test = """
1551    [loggers]
1552    keys=root
1553
1554    [handlers]
1555    keys=screen
1556
1557    [formatters]
1558    keys=
1559
1560    [logger_root]
1561    level=DEBUG
1562    handlers=screen
1563
1564    [handler_screen]
1565    level=DEBUG
1566    class=StreamHandler
1567    args=(sys.stdout,)
1568    formatter=
1569    """
1570
1571    def apply_config(self, conf, **kwargs):
1572        file = io.StringIO(textwrap.dedent(conf))
1573        logging.config.fileConfig(file, encoding="utf-8", **kwargs)
1574
1575    def test_config0_ok(self):
1576        # A simple config file which overrides the default settings.
1577        with support.captured_stdout() as output:
1578            self.apply_config(self.config0)
1579            logger = logging.getLogger()
1580            # Won't output anything
1581            logger.info(self.next_message())
1582            # Outputs a message
1583            logger.error(self.next_message())
1584            self.assert_log_lines([
1585                ('ERROR', '2'),
1586            ], stream=output)
1587            # Original logger output is empty.
1588            self.assert_log_lines([])
1589
1590    def test_config0_using_cp_ok(self):
1591        # A simple config file which overrides the default settings.
1592        with support.captured_stdout() as output:
1593            file = io.StringIO(textwrap.dedent(self.config0))
1594            cp = configparser.ConfigParser()
1595            cp.read_file(file)
1596            logging.config.fileConfig(cp)
1597            logger = logging.getLogger()
1598            # Won't output anything
1599            logger.info(self.next_message())
1600            # Outputs a message
1601            logger.error(self.next_message())
1602            self.assert_log_lines([
1603                ('ERROR', '2'),
1604            ], stream=output)
1605            # Original logger output is empty.
1606            self.assert_log_lines([])
1607
1608    def test_config1_ok(self, config=config1):
1609        # A config file defining a sub-parser as well.
1610        with support.captured_stdout() as output:
1611            self.apply_config(config)
1612            logger = logging.getLogger("compiler.parser")
1613            # Both will output a message
1614            logger.info(self.next_message())
1615            logger.error(self.next_message())
1616            self.assert_log_lines([
1617                ('INFO', '1'),
1618                ('ERROR', '2'),
1619            ], stream=output)
1620            # Original logger output is empty.
1621            self.assert_log_lines([])
1622
1623    def test_config2_failure(self):
1624        # A simple config file which overrides the default settings.
1625        self.assertRaises(Exception, self.apply_config, self.config2)
1626
1627    def test_config3_failure(self):
1628        # A simple config file which overrides the default settings.
1629        self.assertRaises(Exception, self.apply_config, self.config3)
1630
1631    def test_config4_ok(self):
1632        # A config file specifying a custom formatter class.
1633        with support.captured_stdout() as output:
1634            self.apply_config(self.config4)
1635            logger = logging.getLogger()
1636            try:
1637                raise RuntimeError()
1638            except RuntimeError:
1639                logging.exception("just testing")
1640            sys.stdout.seek(0)
1641            self.assertEqual(output.getvalue(),
1642                "ERROR:root:just testing\nGot a [RuntimeError]\n")
1643            # Original logger output is empty
1644            self.assert_log_lines([])
1645
1646    def test_config5_ok(self):
1647        self.test_config1_ok(config=self.config5)
1648
1649    def test_config6_ok(self):
1650        self.test_config1_ok(config=self.config6)
1651
1652    def test_config7_ok(self):
1653        with support.captured_stdout() as output:
1654            self.apply_config(self.config1a)
1655            logger = logging.getLogger("compiler.parser")
1656            # See issue #11424. compiler-hyphenated sorts
1657            # between compiler and compiler.xyz and this
1658            # was preventing compiler.xyz from being included
1659            # in the child loggers of compiler because of an
1660            # overzealous loop termination condition.
1661            hyphenated = logging.getLogger('compiler-hyphenated')
1662            # All will output a message
1663            logger.info(self.next_message())
1664            logger.error(self.next_message())
1665            hyphenated.critical(self.next_message())
1666            self.assert_log_lines([
1667                ('INFO', '1'),
1668                ('ERROR', '2'),
1669                ('CRITICAL', '3'),
1670            ], stream=output)
1671            # Original logger output is empty.
1672            self.assert_log_lines([])
1673        with support.captured_stdout() as output:
1674            self.apply_config(self.config7)
1675            logger = logging.getLogger("compiler.parser")
1676            self.assertFalse(logger.disabled)
1677            # Both will output a message
1678            logger.info(self.next_message())
1679            logger.error(self.next_message())
1680            logger = logging.getLogger("compiler.lexer")
1681            # Both will output a message
1682            logger.info(self.next_message())
1683            logger.error(self.next_message())
1684            # Will not appear
1685            hyphenated.critical(self.next_message())
1686            self.assert_log_lines([
1687                ('INFO', '4'),
1688                ('ERROR', '5'),
1689                ('INFO', '6'),
1690                ('ERROR', '7'),
1691            ], stream=output)
1692            # Original logger output is empty.
1693            self.assert_log_lines([])
1694
1695    def test_config8_ok(self):
1696
1697        with self.check_no_resource_warning():
1698            fn = make_temp_file(".log", "test_logging-X-")
1699
1700            # Replace single backslash with double backslash in windows
1701            # to avoid unicode error during string formatting
1702            if os.name == "nt":
1703                fn = fn.replace("\\", "\\\\")
1704
1705            config8 = self.config8.format(tempfile=fn)
1706
1707            self.apply_config(config8)
1708            self.apply_config(config8)
1709
1710        handler = logging.root.handlers[0]
1711        self.addCleanup(closeFileHandler, handler, fn)
1712
1713    def test_config9_ok(self):
1714        self.apply_config(self.config9)
1715        formatter = logging.root.handlers[0].formatter
1716        result = formatter.format(logging.makeLogRecord({'msg': 'test'}))
1717        self.assertEqual(result, 'test ++ defaultvalue')
1718        result = formatter.format(logging.makeLogRecord(
1719            {'msg': 'test', 'customfield': "customvalue"}))
1720        self.assertEqual(result, 'test ++ customvalue')
1721
1722
1723    def test_logger_disabling(self):
1724        self.apply_config(self.disable_test)
1725        logger = logging.getLogger('some_pristine_logger')
1726        self.assertFalse(logger.disabled)
1727        self.apply_config(self.disable_test)
1728        self.assertTrue(logger.disabled)
1729        self.apply_config(self.disable_test, disable_existing_loggers=False)
1730        self.assertFalse(logger.disabled)
1731
1732    def test_config_set_handler_names(self):
1733        test_config = """
1734            [loggers]
1735            keys=root
1736
1737            [handlers]
1738            keys=hand1
1739
1740            [formatters]
1741            keys=form1
1742
1743            [logger_root]
1744            handlers=hand1
1745
1746            [handler_hand1]
1747            class=StreamHandler
1748            formatter=form1
1749
1750            [formatter_form1]
1751            format=%(levelname)s ++ %(message)s
1752            """
1753        self.apply_config(test_config)
1754        self.assertEqual(logging.getLogger().handlers[0].name, 'hand1')
1755
1756    def test_exception_if_confg_file_is_invalid(self):
1757        test_config = """
1758            [loggers]
1759            keys=root
1760
1761            [handlers]
1762            keys=hand1
1763
1764            [formatters]
1765            keys=form1
1766
1767            [logger_root]
1768            handlers=hand1
1769
1770            [handler_hand1]
1771            class=StreamHandler
1772            formatter=form1
1773
1774            [formatter_form1]
1775            format=%(levelname)s ++ %(message)s
1776
1777            prince
1778            """
1779
1780        file = io.StringIO(textwrap.dedent(test_config))
1781        self.assertRaises(RuntimeError, logging.config.fileConfig, file)
1782
1783    def test_exception_if_confg_file_is_empty(self):
1784        fd, fn = tempfile.mkstemp(prefix='test_empty_', suffix='.ini')
1785        os.close(fd)
1786        self.assertRaises(RuntimeError, logging.config.fileConfig, fn)
1787        os.remove(fn)
1788
1789    def test_exception_if_config_file_does_not_exist(self):
1790        self.assertRaises(FileNotFoundError, logging.config.fileConfig, 'filenotfound')
1791
1792    def test_defaults_do_no_interpolation(self):
1793        """bpo-33802 defaults should not get interpolated"""
1794        ini = textwrap.dedent("""
1795            [formatters]
1796            keys=default
1797
1798            [formatter_default]
1799
1800            [handlers]
1801            keys=console
1802
1803            [handler_console]
1804            class=logging.StreamHandler
1805            args=tuple()
1806
1807            [loggers]
1808            keys=root
1809
1810            [logger_root]
1811            formatter=default
1812            handlers=console
1813            """).strip()
1814        fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini')
1815        try:
1816            os.write(fd, ini.encode('ascii'))
1817            os.close(fd)
1818            logging.config.fileConfig(
1819                fn,
1820                encoding="utf-8",
1821                defaults=dict(
1822                    version=1,
1823                    disable_existing_loggers=False,
1824                    formatters={
1825                        "generic": {
1826                            "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s",
1827                            "datefmt": "[%Y-%m-%d %H:%M:%S %z]",
1828                            "class": "logging.Formatter"
1829                        },
1830                    },
1831                )
1832            )
1833        finally:
1834            os.unlink(fn)
1835
1836
1837@support.requires_working_socket()
1838@threading_helper.requires_working_threading()
1839class SocketHandlerTest(BaseTest):
1840
1841    """Test for SocketHandler objects."""
1842
1843    server_class = TestTCPServer
1844    address = ('localhost', 0)
1845
1846    def setUp(self):
1847        """Set up a TCP server to receive log messages, and a SocketHandler
1848        pointing to that server's address and port."""
1849        BaseTest.setUp(self)
1850        # Issue #29177: deal with errors that happen during setup
1851        self.server = self.sock_hdlr = self.server_exception = None
1852        try:
1853            self.server = server = self.server_class(self.address,
1854                                                     self.handle_socket, 0.01)
1855            server.start()
1856            # Uncomment next line to test error recovery in setUp()
1857            # raise OSError('dummy error raised')
1858        except OSError as e:
1859            self.server_exception = e
1860            return
1861        server.ready.wait()
1862        hcls = logging.handlers.SocketHandler
1863        if isinstance(server.server_address, tuple):
1864            self.sock_hdlr = hcls('localhost', server.port)
1865        else:
1866            self.sock_hdlr = hcls(server.server_address, None)
1867        self.log_output = ''
1868        self.root_logger.removeHandler(self.root_logger.handlers[0])
1869        self.root_logger.addHandler(self.sock_hdlr)
1870        self.handled = threading.Semaphore(0)
1871
1872    def tearDown(self):
1873        """Shutdown the TCP server."""
1874        try:
1875            if self.sock_hdlr:
1876                self.root_logger.removeHandler(self.sock_hdlr)
1877                self.sock_hdlr.close()
1878            if self.server:
1879                self.server.stop()
1880        finally:
1881            BaseTest.tearDown(self)
1882
1883    def handle_socket(self, request):
1884        conn = request.connection
1885        while True:
1886            chunk = conn.recv(4)
1887            if len(chunk) < 4:
1888                break
1889            slen = struct.unpack(">L", chunk)[0]
1890            chunk = conn.recv(slen)
1891            while len(chunk) < slen:
1892                chunk = chunk + conn.recv(slen - len(chunk))
1893            obj = pickle.loads(chunk)
1894            record = logging.makeLogRecord(obj)
1895            self.log_output += record.msg + '\n'
1896            self.handled.release()
1897
1898    def test_output(self):
1899        # The log message sent to the SocketHandler is properly received.
1900        if self.server_exception:
1901            self.skipTest(self.server_exception)
1902        logger = logging.getLogger("tcp")
1903        logger.error("spam")
1904        self.handled.acquire()
1905        logger.debug("eggs")
1906        self.handled.acquire()
1907        self.assertEqual(self.log_output, "spam\neggs\n")
1908
1909    def test_noserver(self):
1910        if self.server_exception:
1911            self.skipTest(self.server_exception)
1912        # Avoid timing-related failures due to SocketHandler's own hard-wired
1913        # one-second timeout on socket.create_connection() (issue #16264).
1914        self.sock_hdlr.retryStart = 2.5
1915        # Kill the server
1916        self.server.stop()
1917        # The logging call should try to connect, which should fail
1918        try:
1919            raise RuntimeError('Deliberate mistake')
1920        except RuntimeError:
1921            self.root_logger.exception('Never sent')
1922        self.root_logger.error('Never sent, either')
1923        now = time.time()
1924        self.assertGreater(self.sock_hdlr.retryTime, now)
1925        time.sleep(self.sock_hdlr.retryTime - now + 0.001)
1926        self.root_logger.error('Nor this')
1927
1928
1929@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1930class UnixSocketHandlerTest(SocketHandlerTest):
1931
1932    """Test for SocketHandler with unix sockets."""
1933
1934    if hasattr(socket, "AF_UNIX"):
1935        server_class = TestUnixStreamServer
1936
1937    def setUp(self):
1938        # override the definition in the base class
1939        self.address = socket_helper.create_unix_domain_name()
1940        self.addCleanup(os_helper.unlink, self.address)
1941        SocketHandlerTest.setUp(self)
1942
1943@support.requires_working_socket()
1944@threading_helper.requires_working_threading()
1945class DatagramHandlerTest(BaseTest):
1946
1947    """Test for DatagramHandler."""
1948
1949    server_class = TestUDPServer
1950    address = ('localhost', 0)
1951
1952    def setUp(self):
1953        """Set up a UDP server to receive log messages, and a DatagramHandler
1954        pointing to that server's address and port."""
1955        BaseTest.setUp(self)
1956        # Issue #29177: deal with errors that happen during setup
1957        self.server = self.sock_hdlr = self.server_exception = None
1958        try:
1959            self.server = server = self.server_class(self.address,
1960                                                     self.handle_datagram, 0.01)
1961            server.start()
1962            # Uncomment next line to test error recovery in setUp()
1963            # raise OSError('dummy error raised')
1964        except OSError as e:
1965            self.server_exception = e
1966            return
1967        server.ready.wait()
1968        hcls = logging.handlers.DatagramHandler
1969        if isinstance(server.server_address, tuple):
1970            self.sock_hdlr = hcls('localhost', server.port)
1971        else:
1972            self.sock_hdlr = hcls(server.server_address, None)
1973        self.log_output = ''
1974        self.root_logger.removeHandler(self.root_logger.handlers[0])
1975        self.root_logger.addHandler(self.sock_hdlr)
1976        self.handled = threading.Event()
1977
1978    def tearDown(self):
1979        """Shutdown the UDP server."""
1980        try:
1981            if self.server:
1982                self.server.stop()
1983            if self.sock_hdlr:
1984                self.root_logger.removeHandler(self.sock_hdlr)
1985                self.sock_hdlr.close()
1986        finally:
1987            BaseTest.tearDown(self)
1988
1989    def handle_datagram(self, request):
1990        slen = struct.pack('>L', 0) # length of prefix
1991        packet = request.packet[len(slen):]
1992        obj = pickle.loads(packet)
1993        record = logging.makeLogRecord(obj)
1994        self.log_output += record.msg + '\n'
1995        self.handled.set()
1996
1997    def test_output(self):
1998        # The log message sent to the DatagramHandler is properly received.
1999        if self.server_exception:
2000            self.skipTest(self.server_exception)
2001        logger = logging.getLogger("udp")
2002        logger.error("spam")
2003        self.handled.wait()
2004        self.handled.clear()
2005        logger.error("eggs")
2006        self.handled.wait()
2007        self.assertEqual(self.log_output, "spam\neggs\n")
2008
2009@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
2010class UnixDatagramHandlerTest(DatagramHandlerTest):
2011
2012    """Test for DatagramHandler using Unix sockets."""
2013
2014    if hasattr(socket, "AF_UNIX"):
2015        server_class = TestUnixDatagramServer
2016
2017    def setUp(self):
2018        # override the definition in the base class
2019        self.address = socket_helper.create_unix_domain_name()
2020        self.addCleanup(os_helper.unlink, self.address)
2021        DatagramHandlerTest.setUp(self)
2022
2023@support.requires_working_socket()
2024@threading_helper.requires_working_threading()
2025class SysLogHandlerTest(BaseTest):
2026
2027    """Test for SysLogHandler using UDP."""
2028
2029    server_class = TestUDPServer
2030    address = ('localhost', 0)
2031
2032    def setUp(self):
2033        """Set up a UDP server to receive log messages, and a SysLogHandler
2034        pointing to that server's address and port."""
2035        BaseTest.setUp(self)
2036        # Issue #29177: deal with errors that happen during setup
2037        self.server = self.sl_hdlr = self.server_exception = None
2038        try:
2039            self.server = server = self.server_class(self.address,
2040                                                     self.handle_datagram, 0.01)
2041            server.start()
2042            # Uncomment next line to test error recovery in setUp()
2043            # raise OSError('dummy error raised')
2044        except OSError as e:
2045            self.server_exception = e
2046            return
2047        server.ready.wait()
2048        hcls = logging.handlers.SysLogHandler
2049        if isinstance(server.server_address, tuple):
2050            self.sl_hdlr = hcls((server.server_address[0], server.port))
2051        else:
2052            self.sl_hdlr = hcls(server.server_address)
2053        self.log_output = b''
2054        self.root_logger.removeHandler(self.root_logger.handlers[0])
2055        self.root_logger.addHandler(self.sl_hdlr)
2056        self.handled = threading.Event()
2057
2058    def tearDown(self):
2059        """Shutdown the server."""
2060        try:
2061            if self.server:
2062                self.server.stop()
2063            if self.sl_hdlr:
2064                self.root_logger.removeHandler(self.sl_hdlr)
2065                self.sl_hdlr.close()
2066        finally:
2067            BaseTest.tearDown(self)
2068
2069    def handle_datagram(self, request):
2070        self.log_output = request.packet
2071        self.handled.set()
2072
2073    def test_output(self):
2074        if self.server_exception:
2075            self.skipTest(self.server_exception)
2076        # The log message sent to the SysLogHandler is properly received.
2077        logger = logging.getLogger("slh")
2078        logger.error("sp\xe4m")
2079        self.handled.wait(support.LONG_TIMEOUT)
2080        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
2081        self.handled.clear()
2082        self.sl_hdlr.append_nul = False
2083        logger.error("sp\xe4m")
2084        self.handled.wait(support.LONG_TIMEOUT)
2085        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
2086        self.handled.clear()
2087        self.sl_hdlr.ident = "h\xe4m-"
2088        logger.error("sp\xe4m")
2089        self.handled.wait(support.LONG_TIMEOUT)
2090        self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
2091
2092    def test_udp_reconnection(self):
2093        logger = logging.getLogger("slh")
2094        self.sl_hdlr.close()
2095        self.handled.clear()
2096        logger.error("sp\xe4m")
2097        self.handled.wait(support.LONG_TIMEOUT)
2098        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
2099
2100@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
2101class UnixSysLogHandlerTest(SysLogHandlerTest):
2102
2103    """Test for SysLogHandler with Unix sockets."""
2104
2105    if hasattr(socket, "AF_UNIX"):
2106        server_class = TestUnixDatagramServer
2107
2108    def setUp(self):
2109        # override the definition in the base class
2110        self.address = socket_helper.create_unix_domain_name()
2111        self.addCleanup(os_helper.unlink, self.address)
2112        SysLogHandlerTest.setUp(self)
2113
2114@unittest.skipUnless(socket_helper.IPV6_ENABLED,
2115                     'IPv6 support required for this test.')
2116class IPv6SysLogHandlerTest(SysLogHandlerTest):
2117
2118    """Test for SysLogHandler with IPv6 host."""
2119
2120    server_class = TestUDPServer
2121    address = ('::1', 0)
2122
2123    def setUp(self):
2124        self.server_class.address_family = socket.AF_INET6
2125        super(IPv6SysLogHandlerTest, self).setUp()
2126
2127    def tearDown(self):
2128        self.server_class.address_family = socket.AF_INET
2129        super(IPv6SysLogHandlerTest, self).tearDown()
2130
2131@support.requires_working_socket()
2132@threading_helper.requires_working_threading()
2133class HTTPHandlerTest(BaseTest):
2134    """Test for HTTPHandler."""
2135
2136    def setUp(self):
2137        """Set up an HTTP server to receive log messages, and a HTTPHandler
2138        pointing to that server's address and port."""
2139        BaseTest.setUp(self)
2140        self.handled = threading.Event()
2141
2142    def handle_request(self, request):
2143        self.command = request.command
2144        self.log_data = urlparse(request.path)
2145        if self.command == 'POST':
2146            try:
2147                rlen = int(request.headers['Content-Length'])
2148                self.post_data = request.rfile.read(rlen)
2149            except:
2150                self.post_data = None
2151        request.send_response(200)
2152        request.end_headers()
2153        self.handled.set()
2154
2155    def test_output(self):
2156        # The log message sent to the HTTPHandler is properly received.
2157        logger = logging.getLogger("http")
2158        root_logger = self.root_logger
2159        root_logger.removeHandler(self.root_logger.handlers[0])
2160        for secure in (False, True):
2161            addr = ('localhost', 0)
2162            if secure:
2163                try:
2164                    import ssl
2165                except ImportError:
2166                    sslctx = None
2167                else:
2168                    here = os.path.dirname(__file__)
2169                    localhost_cert = os.path.join(here, "certdata", "keycert.pem")
2170                    sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2171                    sslctx.load_cert_chain(localhost_cert)
2172
2173                    context = ssl.create_default_context(cafile=localhost_cert)
2174            else:
2175                sslctx = None
2176                context = None
2177            self.server = server = TestHTTPServer(addr, self.handle_request,
2178                                                    0.01, sslctx=sslctx)
2179            server.start()
2180            server.ready.wait()
2181            host = 'localhost:%d' % server.server_port
2182            secure_client = secure and sslctx
2183            self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
2184                                                       secure=secure_client,
2185                                                       context=context,
2186                                                       credentials=('foo', 'bar'))
2187            self.log_data = None
2188            root_logger.addHandler(self.h_hdlr)
2189
2190            for method in ('GET', 'POST'):
2191                self.h_hdlr.method = method
2192                self.handled.clear()
2193                msg = "sp\xe4m"
2194                logger.error(msg)
2195                handled = self.handled.wait(support.SHORT_TIMEOUT)
2196                self.assertTrue(handled, "HTTP request timed out")
2197                self.assertEqual(self.log_data.path, '/frob')
2198                self.assertEqual(self.command, method)
2199                if method == 'GET':
2200                    d = parse_qs(self.log_data.query)
2201                else:
2202                    d = parse_qs(self.post_data.decode('utf-8'))
2203                self.assertEqual(d['name'], ['http'])
2204                self.assertEqual(d['funcName'], ['test_output'])
2205                self.assertEqual(d['msg'], [msg])
2206
2207            self.server.stop()
2208            self.root_logger.removeHandler(self.h_hdlr)
2209            self.h_hdlr.close()
2210
2211class MemoryTest(BaseTest):
2212
2213    """Test memory persistence of logger objects."""
2214
2215    def setUp(self):
2216        """Create a dict to remember potentially destroyed objects."""
2217        BaseTest.setUp(self)
2218        self._survivors = {}
2219
2220    def _watch_for_survival(self, *args):
2221        """Watch the given objects for survival, by creating weakrefs to
2222        them."""
2223        for obj in args:
2224            key = id(obj), repr(obj)
2225            self._survivors[key] = weakref.ref(obj)
2226
2227    def _assertTruesurvival(self):
2228        """Assert that all objects watched for survival have survived."""
2229        # Trigger cycle breaking.
2230        gc.collect()
2231        dead = []
2232        for (id_, repr_), ref in self._survivors.items():
2233            if ref() is None:
2234                dead.append(repr_)
2235        if dead:
2236            self.fail("%d objects should have survived "
2237                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
2238
2239    def test_persistent_loggers(self):
2240        # Logger objects are persistent and retain their configuration, even
2241        #  if visible references are destroyed.
2242        self.root_logger.setLevel(logging.INFO)
2243        foo = logging.getLogger("foo")
2244        self._watch_for_survival(foo)
2245        foo.setLevel(logging.DEBUG)
2246        self.root_logger.debug(self.next_message())
2247        foo.debug(self.next_message())
2248        self.assert_log_lines([
2249            ('foo', 'DEBUG', '2'),
2250        ])
2251        del foo
2252        # foo has survived.
2253        self._assertTruesurvival()
2254        # foo has retained its settings.
2255        bar = logging.getLogger("foo")
2256        bar.debug(self.next_message())
2257        self.assert_log_lines([
2258            ('foo', 'DEBUG', '2'),
2259            ('foo', 'DEBUG', '3'),
2260        ])
2261
2262
2263class EncodingTest(BaseTest):
2264    def test_encoding_plain_file(self):
2265        # In Python 2.x, a plain file object is treated as having no encoding.
2266        log = logging.getLogger("test")
2267        fn = make_temp_file(".log", "test_logging-1-")
2268        # the non-ascii data we write to the log.
2269        data = "foo\x80"
2270        try:
2271            handler = logging.FileHandler(fn, encoding="utf-8")
2272            log.addHandler(handler)
2273            try:
2274                # write non-ascii data to the log.
2275                log.warning(data)
2276            finally:
2277                log.removeHandler(handler)
2278                handler.close()
2279            # check we wrote exactly those bytes, ignoring trailing \n etc
2280            f = open(fn, encoding="utf-8")
2281            try:
2282                self.assertEqual(f.read().rstrip(), data)
2283            finally:
2284                f.close()
2285        finally:
2286            if os.path.isfile(fn):
2287                os.remove(fn)
2288
2289    def test_encoding_cyrillic_unicode(self):
2290        log = logging.getLogger("test")
2291        # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
2292        message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
2293        # Ensure it's written in a Cyrillic encoding
2294        writer_class = codecs.getwriter('cp1251')
2295        writer_class.encoding = 'cp1251'
2296        stream = io.BytesIO()
2297        writer = writer_class(stream, 'strict')
2298        handler = logging.StreamHandler(writer)
2299        log.addHandler(handler)
2300        try:
2301            log.warning(message)
2302        finally:
2303            log.removeHandler(handler)
2304            handler.close()
2305        # check we wrote exactly those bytes, ignoring trailing \n etc
2306        s = stream.getvalue()
2307        # Compare against what the data should be when encoded in CP-1251
2308        self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
2309
2310
2311class WarningsTest(BaseTest):
2312
2313    def test_warnings(self):
2314        with warnings.catch_warnings():
2315            logging.captureWarnings(True)
2316            self.addCleanup(logging.captureWarnings, False)
2317            warnings.filterwarnings("always", category=UserWarning)
2318            stream = io.StringIO()
2319            h = logging.StreamHandler(stream)
2320            logger = logging.getLogger("py.warnings")
2321            logger.addHandler(h)
2322            warnings.warn("I'm warning you...")
2323            logger.removeHandler(h)
2324            s = stream.getvalue()
2325            h.close()
2326            self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
2327
2328            # See if an explicit file uses the original implementation
2329            a_file = io.StringIO()
2330            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
2331                                 a_file, "Dummy line")
2332            s = a_file.getvalue()
2333            a_file.close()
2334            self.assertEqual(s,
2335                "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
2336
2337    def test_warnings_no_handlers(self):
2338        with warnings.catch_warnings():
2339            logging.captureWarnings(True)
2340            self.addCleanup(logging.captureWarnings, False)
2341
2342            # confirm our assumption: no loggers are set
2343            logger = logging.getLogger("py.warnings")
2344            self.assertEqual(logger.handlers, [])
2345
2346            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
2347            self.assertEqual(len(logger.handlers), 1)
2348            self.assertIsInstance(logger.handlers[0], logging.NullHandler)
2349
2350
2351def formatFunc(format, datefmt=None):
2352    return logging.Formatter(format, datefmt)
2353
2354class myCustomFormatter:
2355    def __init__(self, fmt, datefmt=None):
2356        pass
2357
2358def handlerFunc():
2359    return logging.StreamHandler()
2360
2361class CustomHandler(logging.StreamHandler):
2362    pass
2363
2364class CustomListener(logging.handlers.QueueListener):
2365    pass
2366
2367class CustomQueue(queue.Queue):
2368    pass
2369
2370class CustomQueueProtocol:
2371    def __init__(self, maxsize=0):
2372        self.queue = queue.Queue(maxsize)
2373
2374    def __getattr__(self, attribute):
2375        queue = object.__getattribute__(self, 'queue')
2376        return getattr(queue, attribute)
2377
2378class CustomQueueFakeProtocol(CustomQueueProtocol):
2379    # An object implementing the minimial Queue API for
2380    # the logging module but with incorrect signatures.
2381    #
2382    # The object will be considered a valid queue class since we
2383    # do not check the signatures (only callability of methods)
2384    # but will NOT be usable in production since a TypeError will
2385    # be raised due to the extra argument in 'put_nowait'.
2386    def put_nowait(self):
2387        pass
2388
2389class CustomQueueWrongProtocol(CustomQueueProtocol):
2390    put_nowait = None
2391
2392class MinimalQueueProtocol:
2393    def put_nowait(self, x): pass
2394    def get(self): pass
2395
2396def queueMaker():
2397    return queue.Queue()
2398
2399def listenerMaker(arg1, arg2, respect_handler_level=False):
2400    def func(queue, *handlers, **kwargs):
2401        kwargs.setdefault('respect_handler_level', respect_handler_level)
2402        return CustomListener(queue, *handlers, **kwargs)
2403    return func
2404
2405class ConfigDictTest(BaseTest):
2406
2407    """Reading logging config from a dictionary."""
2408
2409    check_no_resource_warning = warnings_helper.check_no_resource_warning
2410    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
2411
2412    # config0 is a standard configuration.
2413    config0 = {
2414        'version': 1,
2415        'formatters': {
2416            'form1' : {
2417                'format' : '%(levelname)s ++ %(message)s',
2418            },
2419        },
2420        'handlers' : {
2421            'hand1' : {
2422                'class' : 'logging.StreamHandler',
2423                'formatter' : 'form1',
2424                'level' : 'NOTSET',
2425                'stream'  : 'ext://sys.stdout',
2426            },
2427        },
2428        'root' : {
2429            'level' : 'WARNING',
2430            'handlers' : ['hand1'],
2431        },
2432    }
2433
2434    # config1 adds a little to the standard configuration.
2435    config1 = {
2436        'version': 1,
2437        'formatters': {
2438            'form1' : {
2439                'format' : '%(levelname)s ++ %(message)s',
2440            },
2441        },
2442        'handlers' : {
2443            'hand1' : {
2444                'class' : 'logging.StreamHandler',
2445                'formatter' : 'form1',
2446                'level' : 'NOTSET',
2447                'stream'  : 'ext://sys.stdout',
2448            },
2449        },
2450        'loggers' : {
2451            'compiler.parser' : {
2452                'level' : 'DEBUG',
2453                'handlers' : ['hand1'],
2454            },
2455        },
2456        'root' : {
2457            'level' : 'WARNING',
2458        },
2459    }
2460
2461    # config1a moves the handler to the root. Used with config8a
2462    config1a = {
2463        'version': 1,
2464        'formatters': {
2465            'form1' : {
2466                'format' : '%(levelname)s ++ %(message)s',
2467            },
2468        },
2469        'handlers' : {
2470            'hand1' : {
2471                'class' : 'logging.StreamHandler',
2472                'formatter' : 'form1',
2473                'level' : 'NOTSET',
2474                'stream'  : 'ext://sys.stdout',
2475            },
2476        },
2477        'loggers' : {
2478            'compiler.parser' : {
2479                'level' : 'DEBUG',
2480            },
2481        },
2482        'root' : {
2483            'level' : 'WARNING',
2484            'handlers' : ['hand1'],
2485        },
2486    }
2487
2488    # config2 has a subtle configuration error that should be reported
2489    config2 = {
2490        'version': 1,
2491        'formatters': {
2492            'form1' : {
2493                'format' : '%(levelname)s ++ %(message)s',
2494            },
2495        },
2496        'handlers' : {
2497            'hand1' : {
2498                'class' : 'logging.StreamHandler',
2499                'formatter' : 'form1',
2500                'level' : 'NOTSET',
2501                'stream'  : 'ext://sys.stdbout',
2502            },
2503        },
2504        'loggers' : {
2505            'compiler.parser' : {
2506                'level' : 'DEBUG',
2507                'handlers' : ['hand1'],
2508            },
2509        },
2510        'root' : {
2511            'level' : 'WARNING',
2512        },
2513    }
2514
2515    # As config1 but with a misspelt level on a handler
2516    config2a = {
2517        'version': 1,
2518        'formatters': {
2519            'form1' : {
2520                'format' : '%(levelname)s ++ %(message)s',
2521            },
2522        },
2523        'handlers' : {
2524            'hand1' : {
2525                'class' : 'logging.StreamHandler',
2526                'formatter' : 'form1',
2527                'level' : 'NTOSET',
2528                'stream'  : 'ext://sys.stdout',
2529            },
2530        },
2531        'loggers' : {
2532            'compiler.parser' : {
2533                'level' : 'DEBUG',
2534                'handlers' : ['hand1'],
2535            },
2536        },
2537        'root' : {
2538            'level' : 'WARNING',
2539        },
2540    }
2541
2542
2543    # As config1 but with a misspelt level on a logger
2544    config2b = {
2545        'version': 1,
2546        'formatters': {
2547            'form1' : {
2548                'format' : '%(levelname)s ++ %(message)s',
2549            },
2550        },
2551        'handlers' : {
2552            'hand1' : {
2553                'class' : 'logging.StreamHandler',
2554                'formatter' : 'form1',
2555                'level' : 'NOTSET',
2556                'stream'  : 'ext://sys.stdout',
2557            },
2558        },
2559        'loggers' : {
2560            'compiler.parser' : {
2561                'level' : 'DEBUG',
2562                'handlers' : ['hand1'],
2563            },
2564        },
2565        'root' : {
2566            'level' : 'WRANING',
2567        },
2568    }
2569
2570    # config3 has a less subtle configuration error
2571    config3 = {
2572        'version': 1,
2573        'formatters': {
2574            'form1' : {
2575                'format' : '%(levelname)s ++ %(message)s',
2576            },
2577        },
2578        'handlers' : {
2579            'hand1' : {
2580                'class' : 'logging.StreamHandler',
2581                'formatter' : 'misspelled_name',
2582                'level' : 'NOTSET',
2583                'stream'  : 'ext://sys.stdout',
2584            },
2585        },
2586        'loggers' : {
2587            'compiler.parser' : {
2588                'level' : 'DEBUG',
2589                'handlers' : ['hand1'],
2590            },
2591        },
2592        'root' : {
2593            'level' : 'WARNING',
2594        },
2595    }
2596
2597    # config4 specifies a custom formatter class to be loaded
2598    config4 = {
2599        'version': 1,
2600        'formatters': {
2601            'form1' : {
2602                '()' : __name__ + '.ExceptionFormatter',
2603                'format' : '%(levelname)s:%(name)s:%(message)s',
2604            },
2605        },
2606        'handlers' : {
2607            'hand1' : {
2608                'class' : 'logging.StreamHandler',
2609                'formatter' : 'form1',
2610                'level' : 'NOTSET',
2611                'stream'  : 'ext://sys.stdout',
2612            },
2613        },
2614        'root' : {
2615            'level' : 'NOTSET',
2616                'handlers' : ['hand1'],
2617        },
2618    }
2619
2620    # As config4 but using an actual callable rather than a string
2621    config4a = {
2622        'version': 1,
2623        'formatters': {
2624            'form1' : {
2625                '()' : ExceptionFormatter,
2626                'format' : '%(levelname)s:%(name)s:%(message)s',
2627            },
2628            'form2' : {
2629                '()' : __name__ + '.formatFunc',
2630                'format' : '%(levelname)s:%(name)s:%(message)s',
2631            },
2632            'form3' : {
2633                '()' : formatFunc,
2634                'format' : '%(levelname)s:%(name)s:%(message)s',
2635            },
2636        },
2637        'handlers' : {
2638            'hand1' : {
2639                'class' : 'logging.StreamHandler',
2640                'formatter' : 'form1',
2641                'level' : 'NOTSET',
2642                'stream'  : 'ext://sys.stdout',
2643            },
2644            'hand2' : {
2645                '()' : handlerFunc,
2646            },
2647        },
2648        'root' : {
2649            'level' : 'NOTSET',
2650                'handlers' : ['hand1'],
2651        },
2652    }
2653
2654    # config5 specifies a custom handler class to be loaded
2655    config5 = {
2656        'version': 1,
2657        'formatters': {
2658            'form1' : {
2659                'format' : '%(levelname)s ++ %(message)s',
2660            },
2661        },
2662        'handlers' : {
2663            'hand1' : {
2664                'class' : __name__ + '.CustomHandler',
2665                'formatter' : 'form1',
2666                'level' : 'NOTSET',
2667                'stream'  : 'ext://sys.stdout',
2668            },
2669        },
2670        'loggers' : {
2671            'compiler.parser' : {
2672                'level' : 'DEBUG',
2673                'handlers' : ['hand1'],
2674            },
2675        },
2676        'root' : {
2677            'level' : 'WARNING',
2678        },
2679    }
2680
2681    # config6 specifies a custom handler class to be loaded
2682    # but has bad arguments
2683    config6 = {
2684        'version': 1,
2685        'formatters': {
2686            'form1' : {
2687                'format' : '%(levelname)s ++ %(message)s',
2688            },
2689        },
2690        'handlers' : {
2691            'hand1' : {
2692                'class' : __name__ + '.CustomHandler',
2693                'formatter' : 'form1',
2694                'level' : 'NOTSET',
2695                'stream'  : 'ext://sys.stdout',
2696                '9' : 'invalid parameter name',
2697            },
2698        },
2699        'loggers' : {
2700            'compiler.parser' : {
2701                'level' : 'DEBUG',
2702                'handlers' : ['hand1'],
2703            },
2704        },
2705        'root' : {
2706            'level' : 'WARNING',
2707        },
2708    }
2709
2710    # config 7 does not define compiler.parser but defines compiler.lexer
2711    # so compiler.parser should be disabled after applying it
2712    config7 = {
2713        'version': 1,
2714        'formatters': {
2715            'form1' : {
2716                'format' : '%(levelname)s ++ %(message)s',
2717            },
2718        },
2719        'handlers' : {
2720            'hand1' : {
2721                'class' : 'logging.StreamHandler',
2722                'formatter' : 'form1',
2723                'level' : 'NOTSET',
2724                'stream'  : 'ext://sys.stdout',
2725            },
2726        },
2727        'loggers' : {
2728            'compiler.lexer' : {
2729                'level' : 'DEBUG',
2730                'handlers' : ['hand1'],
2731            },
2732        },
2733        'root' : {
2734            'level' : 'WARNING',
2735        },
2736    }
2737
2738    # config8 defines both compiler and compiler.lexer
2739    # so compiler.parser should not be disabled (since
2740    # compiler is defined)
2741    config8 = {
2742        'version': 1,
2743        'disable_existing_loggers' : False,
2744        'formatters': {
2745            'form1' : {
2746                'format' : '%(levelname)s ++ %(message)s',
2747            },
2748        },
2749        'handlers' : {
2750            'hand1' : {
2751                'class' : 'logging.StreamHandler',
2752                'formatter' : 'form1',
2753                'level' : 'NOTSET',
2754                'stream'  : 'ext://sys.stdout',
2755            },
2756        },
2757        'loggers' : {
2758            'compiler' : {
2759                'level' : 'DEBUG',
2760                'handlers' : ['hand1'],
2761            },
2762            'compiler.lexer' : {
2763            },
2764        },
2765        'root' : {
2766            'level' : 'WARNING',
2767        },
2768    }
2769
2770    # config8a disables existing loggers
2771    config8a = {
2772        'version': 1,
2773        'disable_existing_loggers' : True,
2774        'formatters': {
2775            'form1' : {
2776                'format' : '%(levelname)s ++ %(message)s',
2777            },
2778        },
2779        'handlers' : {
2780            'hand1' : {
2781                'class' : 'logging.StreamHandler',
2782                'formatter' : 'form1',
2783                'level' : 'NOTSET',
2784                'stream'  : 'ext://sys.stdout',
2785            },
2786        },
2787        'loggers' : {
2788            'compiler' : {
2789                'level' : 'DEBUG',
2790                'handlers' : ['hand1'],
2791            },
2792            'compiler.lexer' : {
2793            },
2794        },
2795        'root' : {
2796            'level' : 'WARNING',
2797        },
2798    }
2799
2800    config9 = {
2801        'version': 1,
2802        'formatters': {
2803            'form1' : {
2804                'format' : '%(levelname)s ++ %(message)s',
2805            },
2806        },
2807        'handlers' : {
2808            'hand1' : {
2809                'class' : 'logging.StreamHandler',
2810                'formatter' : 'form1',
2811                'level' : 'WARNING',
2812                'stream'  : 'ext://sys.stdout',
2813            },
2814        },
2815        'loggers' : {
2816            'compiler.parser' : {
2817                'level' : 'WARNING',
2818                'handlers' : ['hand1'],
2819            },
2820        },
2821        'root' : {
2822            'level' : 'NOTSET',
2823        },
2824    }
2825
2826    config9a = {
2827        'version': 1,
2828        'incremental' : True,
2829        'handlers' : {
2830            'hand1' : {
2831                'level' : 'WARNING',
2832            },
2833        },
2834        'loggers' : {
2835            'compiler.parser' : {
2836                'level' : 'INFO',
2837            },
2838        },
2839    }
2840
2841    config9b = {
2842        'version': 1,
2843        'incremental' : True,
2844        'handlers' : {
2845            'hand1' : {
2846                'level' : 'INFO',
2847            },
2848        },
2849        'loggers' : {
2850            'compiler.parser' : {
2851                'level' : 'INFO',
2852            },
2853        },
2854    }
2855
2856    # As config1 but with a filter added
2857    config10 = {
2858        'version': 1,
2859        'formatters': {
2860            'form1' : {
2861                'format' : '%(levelname)s ++ %(message)s',
2862            },
2863        },
2864        'filters' : {
2865            'filt1' : {
2866                'name' : 'compiler.parser',
2867            },
2868        },
2869        'handlers' : {
2870            'hand1' : {
2871                'class' : 'logging.StreamHandler',
2872                'formatter' : 'form1',
2873                'level' : 'NOTSET',
2874                'stream'  : 'ext://sys.stdout',
2875                'filters' : ['filt1'],
2876            },
2877        },
2878        'loggers' : {
2879            'compiler.parser' : {
2880                'level' : 'DEBUG',
2881                'filters' : ['filt1'],
2882            },
2883        },
2884        'root' : {
2885            'level' : 'WARNING',
2886            'handlers' : ['hand1'],
2887        },
2888    }
2889
2890    # As config1 but using cfg:// references
2891    config11 = {
2892        'version': 1,
2893        'true_formatters': {
2894            'form1' : {
2895                'format' : '%(levelname)s ++ %(message)s',
2896            },
2897        },
2898        'handler_configs': {
2899            'hand1' : {
2900                'class' : 'logging.StreamHandler',
2901                'formatter' : 'form1',
2902                'level' : 'NOTSET',
2903                'stream'  : 'ext://sys.stdout',
2904            },
2905        },
2906        'formatters' : 'cfg://true_formatters',
2907        'handlers' : {
2908            'hand1' : 'cfg://handler_configs[hand1]',
2909        },
2910        'loggers' : {
2911            'compiler.parser' : {
2912                'level' : 'DEBUG',
2913                'handlers' : ['hand1'],
2914            },
2915        },
2916        'root' : {
2917            'level' : 'WARNING',
2918        },
2919    }
2920
2921    # As config11 but missing the version key
2922    config12 = {
2923        'true_formatters': {
2924            'form1' : {
2925                'format' : '%(levelname)s ++ %(message)s',
2926            },
2927        },
2928        'handler_configs': {
2929            'hand1' : {
2930                'class' : 'logging.StreamHandler',
2931                'formatter' : 'form1',
2932                'level' : 'NOTSET',
2933                'stream'  : 'ext://sys.stdout',
2934            },
2935        },
2936        'formatters' : 'cfg://true_formatters',
2937        'handlers' : {
2938            'hand1' : 'cfg://handler_configs[hand1]',
2939        },
2940        'loggers' : {
2941            'compiler.parser' : {
2942                'level' : 'DEBUG',
2943                'handlers' : ['hand1'],
2944            },
2945        },
2946        'root' : {
2947            'level' : 'WARNING',
2948        },
2949    }
2950
2951    # As config11 but using an unsupported version
2952    config13 = {
2953        'version': 2,
2954        'true_formatters': {
2955            'form1' : {
2956                'format' : '%(levelname)s ++ %(message)s',
2957            },
2958        },
2959        'handler_configs': {
2960            'hand1' : {
2961                'class' : 'logging.StreamHandler',
2962                'formatter' : 'form1',
2963                'level' : 'NOTSET',
2964                'stream'  : 'ext://sys.stdout',
2965            },
2966        },
2967        'formatters' : 'cfg://true_formatters',
2968        'handlers' : {
2969            'hand1' : 'cfg://handler_configs[hand1]',
2970        },
2971        'loggers' : {
2972            'compiler.parser' : {
2973                'level' : 'DEBUG',
2974                'handlers' : ['hand1'],
2975            },
2976        },
2977        'root' : {
2978            'level' : 'WARNING',
2979        },
2980    }
2981
2982    # As config0, but with properties
2983    config14 = {
2984        'version': 1,
2985        'formatters': {
2986            'form1' : {
2987                'format' : '%(levelname)s ++ %(message)s',
2988            },
2989        },
2990        'handlers' : {
2991            'hand1' : {
2992                'class' : 'logging.StreamHandler',
2993                'formatter' : 'form1',
2994                'level' : 'NOTSET',
2995                'stream'  : 'ext://sys.stdout',
2996                '.': {
2997                    'foo': 'bar',
2998                    'terminator': '!\n',
2999                }
3000            },
3001        },
3002        'root' : {
3003            'level' : 'WARNING',
3004            'handlers' : ['hand1'],
3005        },
3006    }
3007
3008    # config0 but with default values for formatter. Skipped 15, it is defined
3009    # in the test code.
3010    config16 = {
3011        'version': 1,
3012        'formatters': {
3013            'form1' : {
3014                'format' : '%(message)s ++ %(customfield)s',
3015                'defaults': {"customfield": "defaultvalue"}
3016            },
3017        },
3018        'handlers' : {
3019            'hand1' : {
3020                'class' : 'logging.StreamHandler',
3021                'formatter' : 'form1',
3022                'level' : 'NOTSET',
3023                'stream'  : 'ext://sys.stdout',
3024            },
3025        },
3026        'root' : {
3027            'level' : 'WARNING',
3028            'handlers' : ['hand1'],
3029        },
3030    }
3031
3032    class CustomFormatter(logging.Formatter):
3033        custom_property = "."
3034
3035        def format(self, record):
3036            return super().format(record)
3037
3038    config17 = {
3039        'version': 1,
3040        'formatters': {
3041            "custom": {
3042                "()": CustomFormatter,
3043                "style": "{",
3044                "datefmt": "%Y-%m-%d %H:%M:%S",
3045                "format": "{message}", # <-- to force an exception when configuring
3046                ".": {
3047                    "custom_property": "value"
3048                }
3049            }
3050        },
3051        'handlers' : {
3052            'hand1' : {
3053                'class' : 'logging.StreamHandler',
3054                'formatter' : 'custom',
3055                'level' : 'NOTSET',
3056                'stream'  : 'ext://sys.stdout',
3057            },
3058        },
3059        'root' : {
3060            'level' : 'WARNING',
3061            'handlers' : ['hand1'],
3062        },
3063    }
3064
3065    config18  = {
3066        "version": 1,
3067        "handlers": {
3068            "console": {
3069                "class": "logging.StreamHandler",
3070                "level": "DEBUG",
3071            },
3072            "buffering": {
3073                "class": "logging.handlers.MemoryHandler",
3074                "capacity": 5,
3075                "target": "console",
3076                "level": "DEBUG",
3077                "flushLevel": "ERROR"
3078            }
3079        },
3080        "loggers": {
3081            "mymodule": {
3082                "level": "DEBUG",
3083                "handlers": ["buffering"],
3084                "propagate": "true"
3085            }
3086        }
3087    }
3088
3089    bad_format = {
3090        "version": 1,
3091        "formatters": {
3092            "mySimpleFormatter": {
3093                "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s",
3094                "style": "$"
3095            }
3096        },
3097        "handlers": {
3098            "fileGlobal": {
3099                "class": "logging.StreamHandler",
3100                "level": "DEBUG",
3101                "formatter": "mySimpleFormatter"
3102            },
3103            "bufferGlobal": {
3104                "class": "logging.handlers.MemoryHandler",
3105                "capacity": 5,
3106                "formatter": "mySimpleFormatter",
3107                "target": "fileGlobal",
3108                "level": "DEBUG"
3109            }
3110        },
3111        "loggers": {
3112            "mymodule": {
3113                "level": "DEBUG",
3114                "handlers": ["bufferGlobal"],
3115                "propagate": "true"
3116            }
3117        }
3118    }
3119
3120    # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False
3121    custom_formatter_class_validate = {
3122        'version': 1,
3123        'formatters': {
3124            'form1': {
3125                '()': __name__ + '.ExceptionFormatter',
3126                'format': '%(levelname)s:%(name)s:%(message)s',
3127                'validate': False,
3128            },
3129        },
3130        'handlers' : {
3131            'hand1' : {
3132                'class': 'logging.StreamHandler',
3133                'formatter': 'form1',
3134                'level': 'NOTSET',
3135                'stream': 'ext://sys.stdout',
3136            },
3137        },
3138        "loggers": {
3139            "my_test_logger_custom_formatter": {
3140                "level": "DEBUG",
3141                "handlers": ["hand1"],
3142                "propagate": "true"
3143            }
3144        }
3145    }
3146
3147    # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False
3148    custom_formatter_class_validate2 = {
3149        'version': 1,
3150        'formatters': {
3151            'form1': {
3152                'class': __name__ + '.ExceptionFormatter',
3153                'format': '%(levelname)s:%(name)s:%(message)s',
3154                'validate': False,
3155            },
3156        },
3157        'handlers' : {
3158            'hand1' : {
3159                'class': 'logging.StreamHandler',
3160                'formatter': 'form1',
3161                'level': 'NOTSET',
3162                'stream': 'ext://sys.stdout',
3163            },
3164        },
3165        "loggers": {
3166            "my_test_logger_custom_formatter": {
3167                "level": "DEBUG",
3168                "handlers": ["hand1"],
3169                "propagate": "true"
3170            }
3171        }
3172    }
3173
3174    # Configuration with custom class that is not inherited from logging.Formatter
3175    custom_formatter_class_validate3 = {
3176        'version': 1,
3177        'formatters': {
3178            'form1': {
3179                'class': __name__ + '.myCustomFormatter',
3180                'format': '%(levelname)s:%(name)s:%(message)s',
3181                'validate': False,
3182            },
3183        },
3184        'handlers' : {
3185            'hand1' : {
3186                'class': 'logging.StreamHandler',
3187                'formatter': 'form1',
3188                'level': 'NOTSET',
3189                'stream': 'ext://sys.stdout',
3190            },
3191        },
3192        "loggers": {
3193            "my_test_logger_custom_formatter": {
3194                "level": "DEBUG",
3195                "handlers": ["hand1"],
3196                "propagate": "true"
3197            }
3198        }
3199    }
3200
3201    # Configuration with custom function, 'validate' set to False and no defaults
3202    custom_formatter_with_function = {
3203        'version': 1,
3204        'formatters': {
3205            'form1': {
3206                '()': formatFunc,
3207                'format': '%(levelname)s:%(name)s:%(message)s',
3208                'validate': False,
3209            },
3210        },
3211        'handlers' : {
3212            'hand1' : {
3213                'class': 'logging.StreamHandler',
3214                'formatter': 'form1',
3215                'level': 'NOTSET',
3216                'stream': 'ext://sys.stdout',
3217            },
3218        },
3219        "loggers": {
3220            "my_test_logger_custom_formatter": {
3221                "level": "DEBUG",
3222                "handlers": ["hand1"],
3223                "propagate": "true"
3224            }
3225        }
3226    }
3227
3228    # Configuration with custom function, and defaults
3229    custom_formatter_with_defaults = {
3230        'version': 1,
3231        'formatters': {
3232            'form1': {
3233                '()': formatFunc,
3234                'format': '%(levelname)s:%(name)s:%(message)s:%(customfield)s',
3235                'defaults': {"customfield": "myvalue"}
3236            },
3237        },
3238        'handlers' : {
3239            'hand1' : {
3240                'class': 'logging.StreamHandler',
3241                'formatter': 'form1',
3242                'level': 'NOTSET',
3243                'stream': 'ext://sys.stdout',
3244            },
3245        },
3246        "loggers": {
3247            "my_test_logger_custom_formatter": {
3248                "level": "DEBUG",
3249                "handlers": ["hand1"],
3250                "propagate": "true"
3251            }
3252        }
3253    }
3254
3255    config_queue_handler = {
3256        'version': 1,
3257        'handlers' : {
3258            'h1' : {
3259                'class': 'logging.FileHandler',
3260            },
3261             # key is before depended on handlers to test that deferred config works
3262            'ah' : {
3263                'class': 'logging.handlers.QueueHandler',
3264                'handlers': ['h1']
3265            },
3266        },
3267        "root": {
3268            "level": "DEBUG",
3269            "handlers": ["ah"]
3270        }
3271    }
3272
3273    def apply_config(self, conf):
3274        logging.config.dictConfig(conf)
3275
3276    def check_handler(self, name, cls):
3277        h = logging.getHandlerByName(name)
3278        self.assertIsInstance(h, cls)
3279
3280    def test_config0_ok(self):
3281        # A simple config which overrides the default settings.
3282        with support.captured_stdout() as output:
3283            self.apply_config(self.config0)
3284            self.check_handler('hand1', logging.StreamHandler)
3285            logger = logging.getLogger()
3286            # Won't output anything
3287            logger.info(self.next_message())
3288            # Outputs a message
3289            logger.error(self.next_message())
3290            self.assert_log_lines([
3291                ('ERROR', '2'),
3292            ], stream=output)
3293            # Original logger output is empty.
3294            self.assert_log_lines([])
3295
3296    def test_config1_ok(self, config=config1):
3297        # A config defining a sub-parser as well.
3298        with support.captured_stdout() as output:
3299            self.apply_config(config)
3300            logger = logging.getLogger("compiler.parser")
3301            # Both will output a message
3302            logger.info(self.next_message())
3303            logger.error(self.next_message())
3304            self.assert_log_lines([
3305                ('INFO', '1'),
3306                ('ERROR', '2'),
3307            ], stream=output)
3308            # Original logger output is empty.
3309            self.assert_log_lines([])
3310
3311    def test_config2_failure(self):
3312        # A simple config which overrides the default settings.
3313        self.assertRaises(Exception, self.apply_config, self.config2)
3314
3315    def test_config2a_failure(self):
3316        # A simple config which overrides the default settings.
3317        self.assertRaises(Exception, self.apply_config, self.config2a)
3318
3319    def test_config2b_failure(self):
3320        # A simple config which overrides the default settings.
3321        self.assertRaises(Exception, self.apply_config, self.config2b)
3322
3323    def test_config3_failure(self):
3324        # A simple config which overrides the default settings.
3325        self.assertRaises(Exception, self.apply_config, self.config3)
3326
3327    def test_config4_ok(self):
3328        # A config specifying a custom formatter class.
3329        with support.captured_stdout() as output:
3330            self.apply_config(self.config4)
3331            self.check_handler('hand1', logging.StreamHandler)
3332            #logger = logging.getLogger()
3333            try:
3334                raise RuntimeError()
3335            except RuntimeError:
3336                logging.exception("just testing")
3337            sys.stdout.seek(0)
3338            self.assertEqual(output.getvalue(),
3339                "ERROR:root:just testing\nGot a [RuntimeError]\n")
3340            # Original logger output is empty
3341            self.assert_log_lines([])
3342
3343    def test_config4a_ok(self):
3344        # A config specifying a custom formatter class.
3345        with support.captured_stdout() as output:
3346            self.apply_config(self.config4a)
3347            #logger = logging.getLogger()
3348            try:
3349                raise RuntimeError()
3350            except RuntimeError:
3351                logging.exception("just testing")
3352            sys.stdout.seek(0)
3353            self.assertEqual(output.getvalue(),
3354                "ERROR:root:just testing\nGot a [RuntimeError]\n")
3355            # Original logger output is empty
3356            self.assert_log_lines([])
3357
3358    def test_config5_ok(self):
3359        self.test_config1_ok(config=self.config5)
3360        self.check_handler('hand1', CustomHandler)
3361
3362    def test_config6_failure(self):
3363        self.assertRaises(Exception, self.apply_config, self.config6)
3364
3365    def test_config7_ok(self):
3366        with support.captured_stdout() as output:
3367            self.apply_config(self.config1)
3368            logger = logging.getLogger("compiler.parser")
3369            # Both will output a message
3370            logger.info(self.next_message())
3371            logger.error(self.next_message())
3372            self.assert_log_lines([
3373                ('INFO', '1'),
3374                ('ERROR', '2'),
3375            ], stream=output)
3376            # Original logger output is empty.
3377            self.assert_log_lines([])
3378        with support.captured_stdout() as output:
3379            self.apply_config(self.config7)
3380            self.check_handler('hand1', logging.StreamHandler)
3381            logger = logging.getLogger("compiler.parser")
3382            self.assertTrue(logger.disabled)
3383            logger = logging.getLogger("compiler.lexer")
3384            # Both will output a message
3385            logger.info(self.next_message())
3386            logger.error(self.next_message())
3387            self.assert_log_lines([
3388                ('INFO', '3'),
3389                ('ERROR', '4'),
3390            ], stream=output)
3391            # Original logger output is empty.
3392            self.assert_log_lines([])
3393
3394    # Same as test_config_7_ok but don't disable old loggers.
3395    def test_config_8_ok(self):
3396        with support.captured_stdout() as output:
3397            self.apply_config(self.config1)
3398            logger = logging.getLogger("compiler.parser")
3399            # All will output a message
3400            logger.info(self.next_message())
3401            logger.error(self.next_message())
3402            self.assert_log_lines([
3403                ('INFO', '1'),
3404                ('ERROR', '2'),
3405            ], stream=output)
3406            # Original logger output is empty.
3407            self.assert_log_lines([])
3408        with support.captured_stdout() as output:
3409            self.apply_config(self.config8)
3410            self.check_handler('hand1', logging.StreamHandler)
3411            logger = logging.getLogger("compiler.parser")
3412            self.assertFalse(logger.disabled)
3413            # Both will output a message
3414            logger.info(self.next_message())
3415            logger.error(self.next_message())
3416            logger = logging.getLogger("compiler.lexer")
3417            # Both will output a message
3418            logger.info(self.next_message())
3419            logger.error(self.next_message())
3420            self.assert_log_lines([
3421                ('INFO', '3'),
3422                ('ERROR', '4'),
3423                ('INFO', '5'),
3424                ('ERROR', '6'),
3425            ], stream=output)
3426            # Original logger output is empty.
3427            self.assert_log_lines([])
3428
3429    def test_config_8a_ok(self):
3430        with support.captured_stdout() as output:
3431            self.apply_config(self.config1a)
3432            self.check_handler('hand1', logging.StreamHandler)
3433            logger = logging.getLogger("compiler.parser")
3434            # See issue #11424. compiler-hyphenated sorts
3435            # between compiler and compiler.xyz and this
3436            # was preventing compiler.xyz from being included
3437            # in the child loggers of compiler because of an
3438            # overzealous loop termination condition.
3439            hyphenated = logging.getLogger('compiler-hyphenated')
3440            # All will output a message
3441            logger.info(self.next_message())
3442            logger.error(self.next_message())
3443            hyphenated.critical(self.next_message())
3444            self.assert_log_lines([
3445                ('INFO', '1'),
3446                ('ERROR', '2'),
3447                ('CRITICAL', '3'),
3448            ], stream=output)
3449            # Original logger output is empty.
3450            self.assert_log_lines([])
3451        with support.captured_stdout() as output:
3452            self.apply_config(self.config8a)
3453            self.check_handler('hand1', logging.StreamHandler)
3454            logger = logging.getLogger("compiler.parser")
3455            self.assertFalse(logger.disabled)
3456            # Both will output a message
3457            logger.info(self.next_message())
3458            logger.error(self.next_message())
3459            logger = logging.getLogger("compiler.lexer")
3460            # Both will output a message
3461            logger.info(self.next_message())
3462            logger.error(self.next_message())
3463            # Will not appear
3464            hyphenated.critical(self.next_message())
3465            self.assert_log_lines([
3466                ('INFO', '4'),
3467                ('ERROR', '5'),
3468                ('INFO', '6'),
3469                ('ERROR', '7'),
3470            ], stream=output)
3471            # Original logger output is empty.
3472            self.assert_log_lines([])
3473
3474    def test_config_9_ok(self):
3475        with support.captured_stdout() as output:
3476            self.apply_config(self.config9)
3477            self.check_handler('hand1', logging.StreamHandler)
3478            logger = logging.getLogger("compiler.parser")
3479            # Nothing will be output since both handler and logger are set to WARNING
3480            logger.info(self.next_message())
3481            self.assert_log_lines([], stream=output)
3482            self.apply_config(self.config9a)
3483            # Nothing will be output since handler is still set to WARNING
3484            logger.info(self.next_message())
3485            self.assert_log_lines([], stream=output)
3486            self.apply_config(self.config9b)
3487            # Message should now be output
3488            logger.info(self.next_message())
3489            self.assert_log_lines([
3490                ('INFO', '3'),
3491            ], stream=output)
3492
3493    def test_config_10_ok(self):
3494        with support.captured_stdout() as output:
3495            self.apply_config(self.config10)
3496            self.check_handler('hand1', logging.StreamHandler)
3497            logger = logging.getLogger("compiler.parser")
3498            logger.warning(self.next_message())
3499            logger = logging.getLogger('compiler')
3500            # Not output, because filtered
3501            logger.warning(self.next_message())
3502            logger = logging.getLogger('compiler.lexer')
3503            # Not output, because filtered
3504            logger.warning(self.next_message())
3505            logger = logging.getLogger("compiler.parser.codegen")
3506            # Output, as not filtered
3507            logger.error(self.next_message())
3508            self.assert_log_lines([
3509                ('WARNING', '1'),
3510                ('ERROR', '4'),
3511            ], stream=output)
3512
3513    def test_config11_ok(self):
3514        self.test_config1_ok(self.config11)
3515
3516    def test_config12_failure(self):
3517        self.assertRaises(Exception, self.apply_config, self.config12)
3518
3519    def test_config13_failure(self):
3520        self.assertRaises(Exception, self.apply_config, self.config13)
3521
3522    def test_config14_ok(self):
3523        with support.captured_stdout() as output:
3524            self.apply_config(self.config14)
3525            h = logging._handlers['hand1']
3526            self.assertEqual(h.foo, 'bar')
3527            self.assertEqual(h.terminator, '!\n')
3528            logging.warning('Exclamation')
3529            self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
3530
3531    def test_config15_ok(self):
3532
3533        with self.check_no_resource_warning():
3534            fn = make_temp_file(".log", "test_logging-X-")
3535
3536            config = {
3537                "version": 1,
3538                "handlers": {
3539                    "file": {
3540                        "class": "logging.FileHandler",
3541                        "filename": fn,
3542                        "encoding": "utf-8",
3543                    }
3544                },
3545                "root": {
3546                    "handlers": ["file"]
3547                }
3548            }
3549
3550            self.apply_config(config)
3551            self.apply_config(config)
3552
3553        handler = logging.root.handlers[0]
3554        self.addCleanup(closeFileHandler, handler, fn)
3555
3556    def test_config16_ok(self):
3557        self.apply_config(self.config16)
3558        h = logging._handlers['hand1']
3559
3560        # Custom value
3561        result = h.formatter.format(logging.makeLogRecord(
3562            {'msg': 'Hello', 'customfield': 'customvalue'}))
3563        self.assertEqual(result, 'Hello ++ customvalue')
3564
3565        # Default value
3566        result = h.formatter.format(logging.makeLogRecord(
3567            {'msg': 'Hello'}))
3568        self.assertEqual(result, 'Hello ++ defaultvalue')
3569
3570    def test_config17_ok(self):
3571        self.apply_config(self.config17)
3572        h = logging._handlers['hand1']
3573        self.assertEqual(h.formatter.custom_property, 'value')
3574
3575    def test_config18_ok(self):
3576        self.apply_config(self.config18)
3577        handler = logging.getLogger('mymodule').handlers[0]
3578        self.assertEqual(handler.flushLevel, logging.ERROR)
3579
3580    def setup_via_listener(self, text, verify=None):
3581        text = text.encode("utf-8")
3582        # Ask for a randomly assigned port (by using port 0)
3583        t = logging.config.listen(0, verify)
3584        t.start()
3585        t.ready.wait()
3586        # Now get the port allocated
3587        port = t.port
3588        t.ready.clear()
3589        try:
3590            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3591            sock.settimeout(2.0)
3592            sock.connect(('localhost', port))
3593
3594            slen = struct.pack('>L', len(text))
3595            s = slen + text
3596            sentsofar = 0
3597            left = len(s)
3598            while left > 0:
3599                sent = sock.send(s[sentsofar:])
3600                sentsofar += sent
3601                left -= sent
3602            sock.close()
3603        finally:
3604            t.ready.wait(2.0)
3605            logging.config.stopListening()
3606            threading_helper.join_thread(t)
3607
3608    @support.requires_working_socket()
3609    def test_listen_config_10_ok(self):
3610        with support.captured_stdout() as output:
3611            self.setup_via_listener(json.dumps(self.config10))
3612            self.check_handler('hand1', logging.StreamHandler)
3613            logger = logging.getLogger("compiler.parser")
3614            logger.warning(self.next_message())
3615            logger = logging.getLogger('compiler')
3616            # Not output, because filtered
3617            logger.warning(self.next_message())
3618            logger = logging.getLogger('compiler.lexer')
3619            # Not output, because filtered
3620            logger.warning(self.next_message())
3621            logger = logging.getLogger("compiler.parser.codegen")
3622            # Output, as not filtered
3623            logger.error(self.next_message())
3624            self.assert_log_lines([
3625                ('WARNING', '1'),
3626                ('ERROR', '4'),
3627            ], stream=output)
3628
3629    @support.requires_working_socket()
3630    def test_listen_config_1_ok(self):
3631        with support.captured_stdout() as output:
3632            self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
3633            logger = logging.getLogger("compiler.parser")
3634            # Both will output a message
3635            logger.info(self.next_message())
3636            logger.error(self.next_message())
3637            self.assert_log_lines([
3638                ('INFO', '1'),
3639                ('ERROR', '2'),
3640            ], stream=output)
3641            # Original logger output is empty.
3642            self.assert_log_lines([])
3643
3644    @support.requires_working_socket()
3645    def test_listen_verify(self):
3646
3647        def verify_fail(stuff):
3648            return None
3649
3650        def verify_reverse(stuff):
3651            return stuff[::-1]
3652
3653        logger = logging.getLogger("compiler.parser")
3654        to_send = textwrap.dedent(ConfigFileTest.config1)
3655        # First, specify a verification function that will fail.
3656        # We expect to see no output, since our configuration
3657        # never took effect.
3658        with support.captured_stdout() as output:
3659            self.setup_via_listener(to_send, verify_fail)
3660            # Both will output a message
3661            logger.info(self.next_message())
3662            logger.error(self.next_message())
3663        self.assert_log_lines([], stream=output)
3664        # Original logger output has the stuff we logged.
3665        self.assert_log_lines([
3666            ('INFO', '1'),
3667            ('ERROR', '2'),
3668        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3669
3670        # Now, perform no verification. Our configuration
3671        # should take effect.
3672
3673        with support.captured_stdout() as output:
3674            self.setup_via_listener(to_send)    # no verify callable specified
3675            logger = logging.getLogger("compiler.parser")
3676            # Both will output a message
3677            logger.info(self.next_message())
3678            logger.error(self.next_message())
3679        self.assert_log_lines([
3680            ('INFO', '3'),
3681            ('ERROR', '4'),
3682        ], stream=output)
3683        # Original logger output still has the stuff we logged before.
3684        self.assert_log_lines([
3685            ('INFO', '1'),
3686            ('ERROR', '2'),
3687        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3688
3689        # Now, perform verification which transforms the bytes.
3690
3691        with support.captured_stdout() as output:
3692            self.setup_via_listener(to_send[::-1], verify_reverse)
3693            logger = logging.getLogger("compiler.parser")
3694            # Both will output a message
3695            logger.info(self.next_message())
3696            logger.error(self.next_message())
3697        self.assert_log_lines([
3698            ('INFO', '5'),
3699            ('ERROR', '6'),
3700        ], stream=output)
3701        # Original logger output still has the stuff we logged before.
3702        self.assert_log_lines([
3703            ('INFO', '1'),
3704            ('ERROR', '2'),
3705        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3706
3707    def test_bad_format(self):
3708        self.assertRaises(ValueError, self.apply_config, self.bad_format)
3709
3710    def test_bad_format_with_dollar_style(self):
3711        config = copy.deepcopy(self.bad_format)
3712        config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}"
3713
3714        self.apply_config(config)
3715        handler = logging.getLogger('mymodule').handlers[0]
3716        self.assertIsInstance(handler.target, logging.Handler)
3717        self.assertIsInstance(handler.formatter._style,
3718                              logging.StringTemplateStyle)
3719        self.assertEqual(sorted(logging.getHandlerNames()),
3720                         ['bufferGlobal', 'fileGlobal'])
3721
3722    def test_custom_formatter_class_with_validate(self):
3723        self.apply_config(self.custom_formatter_class_validate)
3724        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3725        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3726
3727    def test_custom_formatter_class_with_validate2(self):
3728        self.apply_config(self.custom_formatter_class_validate2)
3729        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3730        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3731
3732    def test_custom_formatter_class_with_validate2_with_wrong_fmt(self):
3733        config = self.custom_formatter_class_validate.copy()
3734        config['formatters']['form1']['style'] = "$"
3735
3736        # Exception should not be raised as we have configured 'validate' to False
3737        self.apply_config(config)
3738        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3739        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3740
3741    def test_custom_formatter_class_with_validate3(self):
3742        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3)
3743
3744    def test_custom_formatter_function_with_validate(self):
3745        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function)
3746
3747    def test_custom_formatter_function_with_defaults(self):
3748        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_defaults)
3749
3750    def test_baseconfig(self):
3751        d = {
3752            'atuple': (1, 2, 3),
3753            'alist': ['a', 'b', 'c'],
3754            'adict': {
3755                'd': 'e', 'f': 3 ,
3756                'alpha numeric 1 with spaces' : 5,
3757                'alpha numeric 1 %( - © ©ß¯' : 9,
3758                'alpha numeric ] 1 with spaces' : 15,
3759                'alpha ]] numeric 1 %( - © ©ß¯]' : 19,
3760                ' alpha [ numeric 1 %( - © ©ß¯] ' : 11,
3761                ' alpha ' : 32,
3762                '' : 10,
3763                'nest4' : {
3764                    'd': 'e', 'f': 3 ,
3765                    'alpha numeric 1 with spaces' : 5,
3766                    'alpha numeric 1 %( - © ©ß¯' : 9,
3767                    '' : 10,
3768                    'somelist' :  ('g', ('h', 'i'), 'j'),
3769                    'somedict' : {
3770                        'a' : 1,
3771                        'a with 1 and space' : 3,
3772                        'a with ( and space' : 4,
3773                    }
3774                }
3775            },
3776            'nest1': ('g', ('h', 'i'), 'j'),
3777            'nest2': ['k', ['l', 'm'], 'n'],
3778            'nest3': ['o', 'cfg://alist', 'p'],
3779        }
3780        bc = logging.config.BaseConfigurator(d)
3781        self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
3782        self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
3783        self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
3784        self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
3785        self.assertEqual(bc.convert('cfg://adict.d'), 'e')
3786        self.assertEqual(bc.convert('cfg://adict[f]'), 3)
3787        self.assertEqual(bc.convert('cfg://adict[alpha numeric 1 with spaces]'), 5)
3788        self.assertEqual(bc.convert('cfg://adict[alpha numeric 1 %( - © ©ß¯]'), 9)
3789        self.assertEqual(bc.convert('cfg://adict[]'), 10)
3790        self.assertEqual(bc.convert('cfg://adict.nest4.d'), 'e')
3791        self.assertEqual(bc.convert('cfg://adict.nest4[d]'), 'e')
3792        self.assertEqual(bc.convert('cfg://adict[nest4].d'), 'e')
3793        self.assertEqual(bc.convert('cfg://adict[nest4][f]'), 3)
3794        self.assertEqual(bc.convert('cfg://adict[nest4][alpha numeric 1 with spaces]'), 5)
3795        self.assertEqual(bc.convert('cfg://adict[nest4][alpha numeric 1 %( - © ©ß¯]'), 9)
3796        self.assertEqual(bc.convert('cfg://adict[nest4][]'), 10)
3797        self.assertEqual(bc.convert('cfg://adict[nest4][somelist][0]'), 'g')
3798        self.assertEqual(bc.convert('cfg://adict[nest4][somelist][1][0]'), 'h')
3799        self.assertEqual(bc.convert('cfg://adict[nest4][somelist][1][1]'), 'i')
3800        self.assertEqual(bc.convert('cfg://adict[nest4][somelist][2]'), 'j')
3801        self.assertEqual(bc.convert('cfg://adict[nest4].somedict.a'), 1)
3802        self.assertEqual(bc.convert('cfg://adict[nest4].somedict[a]'), 1)
3803        self.assertEqual(bc.convert('cfg://adict[nest4].somedict[a with 1 and space]'), 3)
3804        self.assertEqual(bc.convert('cfg://adict[nest4].somedict[a with ( and space]'), 4)
3805        self.assertEqual(bc.convert('cfg://adict.nest4.somelist[1][1]'), 'i')
3806        self.assertEqual(bc.convert('cfg://adict.nest4.somelist[2]'), 'j')
3807        self.assertEqual(bc.convert('cfg://adict.nest4.somedict.a'), 1)
3808        self.assertEqual(bc.convert('cfg://adict.nest4.somedict[a]'), 1)
3809        v = bc.convert('cfg://nest3')
3810        self.assertEqual(v.pop(1), ['a', 'b', 'c'])
3811        self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
3812        self.assertRaises(ValueError, bc.convert, 'cfg://!')
3813        self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
3814        self.assertRaises(KeyError, bc.convert, 'cfg://adict[alpha numeric ] 1 with spaces]')
3815        self.assertRaises(ValueError, bc.convert, 'cfg://adict[ alpha ]] numeric 1 %( - © ©ß¯] ]')
3816        self.assertRaises(ValueError, bc.convert, 'cfg://adict[ alpha [ numeric 1 %( - © ©ß¯] ]')
3817
3818    def test_namedtuple(self):
3819        # see bpo-39142
3820        from collections import namedtuple
3821
3822        class MyHandler(logging.StreamHandler):
3823            def __init__(self, resource, *args, **kwargs):
3824                super().__init__(*args, **kwargs)
3825                self.resource: namedtuple = resource
3826
3827            def emit(self, record):
3828                record.msg += f' {self.resource.type}'
3829                return super().emit(record)
3830
3831        Resource = namedtuple('Resource', ['type', 'labels'])
3832        resource = Resource(type='my_type', labels=['a'])
3833
3834        config = {
3835            'version': 1,
3836            'handlers': {
3837                'myhandler': {
3838                    '()': MyHandler,
3839                    'resource': resource
3840                }
3841            },
3842            'root':  {'level': 'INFO', 'handlers': ['myhandler']},
3843        }
3844        with support.captured_stderr() as stderr:
3845            self.apply_config(config)
3846            logging.info('some log')
3847        self.assertEqual(stderr.getvalue(), 'some log my_type\n')
3848
3849    def test_config_callable_filter_works(self):
3850        def filter_(_):
3851            return 1
3852        self.apply_config({
3853            "version": 1, "root": {"level": "DEBUG", "filters": [filter_]}
3854        })
3855        assert logging.getLogger().filters[0] is filter_
3856        logging.getLogger().filters = []
3857
3858    def test_config_filter_works(self):
3859        filter_ = logging.Filter("spam.eggs")
3860        self.apply_config({
3861            "version": 1, "root": {"level": "DEBUG", "filters": [filter_]}
3862        })
3863        assert logging.getLogger().filters[0] is filter_
3864        logging.getLogger().filters = []
3865
3866    def test_config_filter_method_works(self):
3867        class FakeFilter:
3868            def filter(self, _):
3869                return 1
3870        filter_ = FakeFilter()
3871        self.apply_config({
3872            "version": 1, "root": {"level": "DEBUG", "filters": [filter_]}
3873        })
3874        assert logging.getLogger().filters[0] is filter_
3875        logging.getLogger().filters = []
3876
3877    def test_invalid_type_raises(self):
3878        class NotAFilter: pass
3879        for filter_ in [None, 1, NotAFilter()]:
3880            self.assertRaises(
3881                ValueError,
3882                self.apply_config,
3883                {"version": 1, "root": {"level": "DEBUG", "filters": [filter_]}}
3884            )
3885
3886    def do_queuehandler_configuration(self, qspec, lspec):
3887        cd = copy.deepcopy(self.config_queue_handler)
3888        fn = make_temp_file('.log', 'test_logging-cqh-')
3889        cd['handlers']['h1']['filename'] = fn
3890        if qspec is not None:
3891            cd['handlers']['ah']['queue'] = qspec
3892        if lspec is not None:
3893            cd['handlers']['ah']['listener'] = lspec
3894        qh = None
3895        try:
3896            self.apply_config(cd)
3897            qh = logging.getHandlerByName('ah')
3898            self.assertEqual(sorted(logging.getHandlerNames()), ['ah', 'h1'])
3899            self.assertIsNotNone(qh.listener)
3900            qh.listener.start()
3901            logging.debug('foo')
3902            logging.info('bar')
3903            logging.warning('baz')
3904
3905            # Need to let the listener thread finish its work
3906            while support.sleeping_retry(support.LONG_TIMEOUT,
3907                                         "queue not empty"):
3908                if qh.listener.queue.empty():
3909                    break
3910
3911            # wait until the handler completed its last task
3912            qh.listener.queue.join()
3913
3914            with open(fn, encoding='utf-8') as f:
3915                data = f.read().splitlines()
3916            self.assertEqual(data, ['foo', 'bar', 'baz'])
3917        finally:
3918            if qh:
3919                qh.listener.stop()
3920            h = logging.getHandlerByName('h1')
3921            if h:
3922                self.addCleanup(closeFileHandler, h, fn)
3923            else:
3924                self.addCleanup(os.remove, fn)
3925
3926    @threading_helper.requires_working_threading()
3927    @support.requires_subprocess()
3928    def test_config_queue_handler(self):
3929        qs = [CustomQueue(), CustomQueueProtocol()]
3930        dqs = [{'()': f'{__name__}.{cls}', 'maxsize': 10}
3931               for cls in ['CustomQueue', 'CustomQueueProtocol']]
3932        dl = {
3933            '()': __name__ + '.listenerMaker',
3934            'arg1': None,
3935            'arg2': None,
3936            'respect_handler_level': True
3937        }
3938        qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', *dqs, *qs)
3939        lvalues = (None, __name__ + '.CustomListener', dl, CustomListener)
3940        for qspec, lspec in itertools.product(qvalues, lvalues):
3941            self.do_queuehandler_configuration(qspec, lspec)
3942
3943        # Some failure cases
3944        qvalues = (None, 4, int, '', 'foo')
3945        lvalues = (None, 4, int, '', 'bar')
3946        for qspec, lspec in itertools.product(qvalues, lvalues):
3947            if lspec is None and qspec is None:
3948                continue
3949            with self.assertRaises(ValueError) as ctx:
3950                self.do_queuehandler_configuration(qspec, lspec)
3951            msg = str(ctx.exception)
3952            self.assertEqual(msg, "Unable to configure handler 'ah'")
3953
3954    def _apply_simple_queue_listener_configuration(self, qspec):
3955        self.apply_config({
3956            "version": 1,
3957            "handlers": {
3958                "queue_listener": {
3959                    "class": "logging.handlers.QueueHandler",
3960                    "queue": qspec,
3961                },
3962            },
3963        })
3964
3965    @threading_helper.requires_working_threading()
3966    @support.requires_subprocess()
3967    @patch("multiprocessing.Manager")
3968    def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager):
3969        # gh-120868, gh-121723, gh-124653
3970
3971        for qspec in [
3972            {"()": "queue.Queue", "maxsize": -1},
3973            queue.Queue(),
3974            # queue.SimpleQueue does not inherit from queue.Queue
3975            queue.SimpleQueue(),
3976            # CustomQueueFakeProtocol passes the checks but will not be usable
3977            # since the signatures are incompatible. Checking the Queue API
3978            # without testing the type of the actual queue is a trade-off
3979            # between usability and the work we need to do in order to safely
3980            # check that the queue object correctly implements the API.
3981            CustomQueueFakeProtocol(),
3982            MinimalQueueProtocol(),
3983        ]:
3984            with self.subTest(qspec=qspec):
3985                self._apply_simple_queue_listener_configuration(qspec)
3986                manager.assert_not_called()
3987
3988    @patch("multiprocessing.Manager")
3989    def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager):
3990        # gh-120868, gh-121723
3991
3992        for qspec in [object(), CustomQueueWrongProtocol()]:
3993            with self.subTest(qspec=qspec), self.assertRaises(ValueError):
3994                self._apply_simple_queue_listener_configuration(qspec)
3995                manager.assert_not_called()
3996
3997    @skip_if_tsan_fork
3998    @support.requires_subprocess()
3999    @unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
4000                                           " assertions in multiprocessing")
4001    def test_config_reject_simple_queue_handler_multiprocessing_context(self):
4002        # multiprocessing.SimpleQueue does not implement 'put_nowait'
4003        # and thus cannot be used as a queue-like object (gh-124653)
4004
4005        import multiprocessing
4006
4007        if support.MS_WINDOWS:
4008            start_methods = ['spawn']
4009        else:
4010            start_methods = ['spawn', 'fork', 'forkserver']
4011
4012        for start_method in start_methods:
4013            with self.subTest(start_method=start_method):
4014                ctx = multiprocessing.get_context(start_method)
4015                qspec = ctx.SimpleQueue()
4016                with self.assertRaises(ValueError):
4017                    self._apply_simple_queue_listener_configuration(qspec)
4018
4019    @skip_if_tsan_fork
4020    @support.requires_subprocess()
4021    @unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
4022                                           " assertions in multiprocessing")
4023    def test_config_queue_handler_multiprocessing_context(self):
4024        # regression test for gh-121723
4025        if support.MS_WINDOWS:
4026            start_methods = ['spawn']
4027        else:
4028            start_methods = ['spawn', 'fork', 'forkserver']
4029        for start_method in start_methods:
4030            with self.subTest(start_method=start_method):
4031                ctx = multiprocessing.get_context(start_method)
4032                with ctx.Manager() as manager:
4033                    q = manager.Queue()
4034                    records = []
4035                    # use 1 process and 1 task per child to put 1 record
4036                    with ctx.Pool(1, initializer=self._mpinit_issue121723,
4037                                  initargs=(q, "text"), maxtasksperchild=1):
4038                        records.append(q.get(timeout=60))
4039                    self.assertTrue(q.empty())
4040                self.assertEqual(len(records), 1)
4041
4042    @staticmethod
4043    def _mpinit_issue121723(qspec, message_to_log):
4044        # static method for pickling support
4045        logging.config.dictConfig({
4046            'version': 1,
4047            'disable_existing_loggers': True,
4048            'handlers': {
4049                'log_to_parent': {
4050                    'class': 'logging.handlers.QueueHandler',
4051                    'queue': qspec
4052                }
4053            },
4054            'root': {'handlers': ['log_to_parent'], 'level': 'DEBUG'}
4055        })
4056        # log a message (this creates a record put in the queue)
4057        logging.getLogger().info(message_to_log)
4058
4059    @skip_if_tsan_fork
4060    @support.requires_subprocess()
4061    def test_multiprocessing_queues(self):
4062        # See gh-119819
4063
4064        cd = copy.deepcopy(self.config_queue_handler)
4065        from multiprocessing import Queue as MQ, Manager as MM
4066        q1 = MQ()  # this can't be pickled
4067        q2 = MM().Queue()  # a proxy queue for use when pickling is needed
4068        q3 = MM().JoinableQueue()  # a joinable proxy queue
4069        for qspec in (q1, q2, q3):
4070            fn = make_temp_file('.log', 'test_logging-cmpqh-')
4071            cd['handlers']['h1']['filename'] = fn
4072            cd['handlers']['ah']['queue'] = qspec
4073            qh = None
4074            try:
4075                self.apply_config(cd)
4076                qh = logging.getHandlerByName('ah')
4077                self.assertEqual(sorted(logging.getHandlerNames()), ['ah', 'h1'])
4078                self.assertIsNotNone(qh.listener)
4079                self.assertIs(qh.queue, qspec)
4080                self.assertIs(qh.listener.queue, qspec)
4081            finally:
4082                h = logging.getHandlerByName('h1')
4083                if h:
4084                    self.addCleanup(closeFileHandler, h, fn)
4085                else:
4086                    self.addCleanup(os.remove, fn)
4087
4088    def test_90195(self):
4089        # See gh-90195
4090        config = {
4091            'version': 1,
4092            'disable_existing_loggers': False,
4093            'handlers': {
4094                'console': {
4095                    'level': 'DEBUG',
4096                    'class': 'logging.StreamHandler',
4097                },
4098            },
4099            'loggers': {
4100                'a': {
4101                    'level': 'DEBUG',
4102                    'handlers': ['console']
4103                }
4104            }
4105        }
4106        logger = logging.getLogger('a')
4107        self.assertFalse(logger.disabled)
4108        self.apply_config(config)
4109        self.assertFalse(logger.disabled)
4110        # Should disable all loggers ...
4111        self.apply_config({'version': 1})
4112        self.assertTrue(logger.disabled)
4113        del config['disable_existing_loggers']
4114        self.apply_config(config)
4115        # Logger should be enabled, since explicitly mentioned
4116        self.assertFalse(logger.disabled)
4117
4118    def test_111615(self):
4119        # See gh-111615
4120        import_helper.import_module('_multiprocessing')  # see gh-113692
4121        mp = import_helper.import_module('multiprocessing')
4122
4123        config = {
4124            'version': 1,
4125            'handlers': {
4126                'sink': {
4127                    'class': 'logging.handlers.QueueHandler',
4128                    'queue': mp.get_context('spawn').Queue(),
4129                },
4130            },
4131            'root': {
4132                'handlers': ['sink'],
4133                'level': 'DEBUG',
4134            },
4135        }
4136        logging.config.dictConfig(config)
4137
4138    # gh-118868: check if kwargs are passed to logging QueueHandler
4139    def test_kwargs_passing(self):
4140        class CustomQueueHandler(logging.handlers.QueueHandler):
4141            def __init__(self, *args, **kwargs):
4142                super().__init__(queue.Queue())
4143                self.custom_kwargs = kwargs
4144
4145        custom_kwargs = {'foo': 'bar'}
4146
4147        config = {
4148            'version': 1,
4149            'handlers': {
4150                'custom': {
4151                    'class': CustomQueueHandler,
4152                    **custom_kwargs
4153                },
4154            },
4155            'root': {
4156                'level': 'DEBUG',
4157                'handlers': ['custom']
4158            }
4159        }
4160
4161        logging.config.dictConfig(config)
4162
4163        handler = logging.getHandlerByName('custom')
4164        self.assertEqual(handler.custom_kwargs, custom_kwargs)
4165
4166
4167class ManagerTest(BaseTest):
4168    def test_manager_loggerclass(self):
4169        logged = []
4170
4171        class MyLogger(logging.Logger):
4172            def _log(self, level, msg, args, exc_info=None, extra=None):
4173                logged.append(msg)
4174
4175        man = logging.Manager(None)
4176        self.assertRaises(TypeError, man.setLoggerClass, int)
4177        man.setLoggerClass(MyLogger)
4178        logger = man.getLogger('test')
4179        logger.warning('should appear in logged')
4180        logging.warning('should not appear in logged')
4181
4182        self.assertEqual(logged, ['should appear in logged'])
4183
4184    def test_set_log_record_factory(self):
4185        man = logging.Manager(None)
4186        expected = object()
4187        man.setLogRecordFactory(expected)
4188        self.assertEqual(man.logRecordFactory, expected)
4189
4190class ChildLoggerTest(BaseTest):
4191    def test_child_loggers(self):
4192        r = logging.getLogger()
4193        l1 = logging.getLogger('abc')
4194        l2 = logging.getLogger('def.ghi')
4195        c1 = r.getChild('xyz')
4196        c2 = r.getChild('uvw.xyz')
4197        self.assertIs(c1, logging.getLogger('xyz'))
4198        self.assertIs(c2, logging.getLogger('uvw.xyz'))
4199        c1 = l1.getChild('def')
4200        c2 = c1.getChild('ghi')
4201        c3 = l1.getChild('def.ghi')
4202        self.assertIs(c1, logging.getLogger('abc.def'))
4203        self.assertIs(c2, logging.getLogger('abc.def.ghi'))
4204        self.assertIs(c2, c3)
4205
4206    def test_get_children(self):
4207        r = logging.getLogger()
4208        l1 = logging.getLogger('foo')
4209        l2 = logging.getLogger('foo.bar')
4210        l3 = logging.getLogger('foo.bar.baz.bozz')
4211        l4 = logging.getLogger('bar')
4212        kids = r.getChildren()
4213        expected = {l1, l4}
4214        self.assertEqual(expected, kids & expected)  # might be other kids for root
4215        self.assertNotIn(l2, expected)
4216        kids = l1.getChildren()
4217        self.assertEqual({l2}, kids)
4218        kids = l2.getChildren()
4219        self.assertEqual(set(), kids)
4220
4221class DerivedLogRecord(logging.LogRecord):
4222    pass
4223
4224class LogRecordFactoryTest(BaseTest):
4225
4226    def setUp(self):
4227        class CheckingFilter(logging.Filter):
4228            def __init__(self, cls):
4229                self.cls = cls
4230
4231            def filter(self, record):
4232                t = type(record)
4233                if t is not self.cls:
4234                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
4235                            self.cls)
4236                    raise TypeError(msg)
4237                return True
4238
4239        BaseTest.setUp(self)
4240        self.filter = CheckingFilter(DerivedLogRecord)
4241        self.root_logger.addFilter(self.filter)
4242        self.orig_factory = logging.getLogRecordFactory()
4243
4244    def tearDown(self):
4245        self.root_logger.removeFilter(self.filter)
4246        BaseTest.tearDown(self)
4247        logging.setLogRecordFactory(self.orig_factory)
4248
4249    def test_logrecord_class(self):
4250        self.assertRaises(TypeError, self.root_logger.warning,
4251                          self.next_message())
4252        logging.setLogRecordFactory(DerivedLogRecord)
4253        self.root_logger.error(self.next_message())
4254        self.assert_log_lines([
4255           ('root', 'ERROR', '2'),
4256        ])
4257
4258
4259@threading_helper.requires_working_threading()
4260class QueueHandlerTest(BaseTest):
4261    # Do not bother with a logger name group.
4262    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
4263
4264    def setUp(self):
4265        BaseTest.setUp(self)
4266        self.queue = queue.Queue(-1)
4267        self.que_hdlr = logging.handlers.QueueHandler(self.queue)
4268        self.name = 'que'
4269        self.que_logger = logging.getLogger('que')
4270        self.que_logger.propagate = False
4271        self.que_logger.setLevel(logging.WARNING)
4272        self.que_logger.addHandler(self.que_hdlr)
4273
4274    def tearDown(self):
4275        self.que_hdlr.close()
4276        BaseTest.tearDown(self)
4277
4278    def test_queue_handler(self):
4279        self.que_logger.debug(self.next_message())
4280        self.assertRaises(queue.Empty, self.queue.get_nowait)
4281        self.que_logger.info(self.next_message())
4282        self.assertRaises(queue.Empty, self.queue.get_nowait)
4283        msg = self.next_message()
4284        self.que_logger.warning(msg)
4285        data = self.queue.get_nowait()
4286        self.assertTrue(isinstance(data, logging.LogRecord))
4287        self.assertEqual(data.name, self.que_logger.name)
4288        self.assertEqual((data.msg, data.args), (msg, None))
4289
4290    def test_formatting(self):
4291        msg = self.next_message()
4292        levelname = logging.getLevelName(logging.WARNING)
4293        log_format_str = '{name} -> {levelname}: {message}'
4294        formatted_msg = log_format_str.format(name=self.name,
4295                                              levelname=levelname, message=msg)
4296        formatter = logging.Formatter(self.log_format)
4297        self.que_hdlr.setFormatter(formatter)
4298        self.que_logger.warning(msg)
4299        log_record = self.queue.get_nowait()
4300        self.assertEqual(formatted_msg, log_record.msg)
4301        self.assertEqual(formatted_msg, log_record.message)
4302
4303    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
4304                         'logging.handlers.QueueListener required for this test')
4305    def test_queue_listener(self):
4306        handler = TestHandler(support.Matcher())
4307        listener = logging.handlers.QueueListener(self.queue, handler)
4308        listener.start()
4309        try:
4310            self.que_logger.warning(self.next_message())
4311            self.que_logger.error(self.next_message())
4312            self.que_logger.critical(self.next_message())
4313        finally:
4314            listener.stop()
4315            listener.stop()  # gh-114706 - ensure no crash if called again
4316        self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
4317        self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
4318        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
4319        handler.close()
4320
4321        # Now test with respect_handler_level set
4322
4323        handler = TestHandler(support.Matcher())
4324        handler.setLevel(logging.CRITICAL)
4325        listener = logging.handlers.QueueListener(self.queue, handler,
4326                                                  respect_handler_level=True)
4327        listener.start()
4328        try:
4329            self.que_logger.warning(self.next_message())
4330            self.que_logger.error(self.next_message())
4331            self.que_logger.critical(self.next_message())
4332        finally:
4333            listener.stop()
4334        self.assertFalse(handler.matches(levelno=logging.WARNING, message='4'))
4335        self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
4336        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
4337        handler.close()
4338
4339    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
4340                         'logging.handlers.QueueListener required for this test')
4341    def test_queue_listener_with_StreamHandler(self):
4342        # Test that traceback and stack-info only appends once (bpo-34334, bpo-46755).
4343        listener = logging.handlers.QueueListener(self.queue, self.root_hdlr)
4344        listener.start()
4345        try:
4346            1 / 0
4347        except ZeroDivisionError as e:
4348            exc = e
4349            self.que_logger.exception(self.next_message(), exc_info=exc)
4350        self.que_logger.error(self.next_message(), stack_info=True)
4351        listener.stop()
4352        self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1)
4353        self.assertEqual(self.stream.getvalue().strip().count('Stack'), 1)
4354
4355    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
4356                         'logging.handlers.QueueListener required for this test')
4357    def test_queue_listener_with_multiple_handlers(self):
4358        # Test that queue handler format doesn't affect other handler formats (bpo-35726).
4359        self.que_hdlr.setFormatter(self.root_formatter)
4360        self.que_logger.addHandler(self.root_hdlr)
4361
4362        listener = logging.handlers.QueueListener(self.queue, self.que_hdlr)
4363        listener.start()
4364        self.que_logger.error("error")
4365        listener.stop()
4366        self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error")
4367
4368if hasattr(logging.handlers, 'QueueListener'):
4369    import multiprocessing
4370    from unittest.mock import patch
4371
4372    @skip_if_tsan_fork
4373    @threading_helper.requires_working_threading()
4374    class QueueListenerTest(BaseTest):
4375        """
4376        Tests based on patch submitted for issue #27930. Ensure that
4377        QueueListener handles all log messages.
4378        """
4379
4380        repeat = 20
4381
4382        @staticmethod
4383        def setup_and_log(log_queue, ident):
4384            """
4385            Creates a logger with a QueueHandler that logs to a queue read by a
4386            QueueListener. Starts the listener, logs five messages, and stops
4387            the listener.
4388            """
4389            logger = logging.getLogger('test_logger_with_id_%s' % ident)
4390            logger.setLevel(logging.DEBUG)
4391            handler = logging.handlers.QueueHandler(log_queue)
4392            logger.addHandler(handler)
4393            listener = logging.handlers.QueueListener(log_queue)
4394            listener.start()
4395
4396            logger.info('one')
4397            logger.info('two')
4398            logger.info('three')
4399            logger.info('four')
4400            logger.info('five')
4401
4402            listener.stop()
4403            logger.removeHandler(handler)
4404            handler.close()
4405
4406        @patch.object(logging.handlers.QueueListener, 'handle')
4407        def test_handle_called_with_queue_queue(self, mock_handle):
4408            for i in range(self.repeat):
4409                log_queue = queue.Queue()
4410                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
4411            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
4412                             'correct number of handled log messages')
4413
4414        @patch.object(logging.handlers.QueueListener, 'handle')
4415        def test_handle_called_with_mp_queue(self, mock_handle):
4416            # bpo-28668: The multiprocessing (mp) module is not functional
4417            # when the mp.synchronize module cannot be imported.
4418            support.skip_if_broken_multiprocessing_synchronize()
4419            for i in range(self.repeat):
4420                log_queue = multiprocessing.Queue()
4421                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
4422                log_queue.close()
4423                log_queue.join_thread()
4424            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
4425                             'correct number of handled log messages')
4426
4427        @staticmethod
4428        def get_all_from_queue(log_queue):
4429            try:
4430                while True:
4431                    yield log_queue.get_nowait()
4432            except queue.Empty:
4433                return []
4434
4435        def test_no_messages_in_queue_after_stop(self):
4436            """
4437            Five messages are logged then the QueueListener is stopped. This
4438            test then gets everything off the queue. Failure of this test
4439            indicates that messages were not registered on the queue until
4440            _after_ the QueueListener stopped.
4441            """
4442            # bpo-28668: The multiprocessing (mp) module is not functional
4443            # when the mp.synchronize module cannot be imported.
4444            support.skip_if_broken_multiprocessing_synchronize()
4445            for i in range(self.repeat):
4446                queue = multiprocessing.Queue()
4447                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
4448                # time.sleep(1)
4449                items = list(self.get_all_from_queue(queue))
4450                queue.close()
4451                queue.join_thread()
4452
4453                expected = [[], [logging.handlers.QueueListener._sentinel]]
4454                self.assertIn(items, expected,
4455                              'Found unexpected messages in queue: %s' % (
4456                                    [m.msg if isinstance(m, logging.LogRecord)
4457                                     else m for m in items]))
4458
4459        def test_calls_task_done_after_stop(self):
4460            # Issue 36813: Make sure queue.join does not deadlock.
4461            log_queue = queue.Queue()
4462            listener = logging.handlers.QueueListener(log_queue)
4463            listener.start()
4464            listener.stop()
4465            with self.assertRaises(ValueError):
4466                # Make sure all tasks are done and .join won't block.
4467                log_queue.task_done()
4468
4469
4470ZERO = datetime.timedelta(0)
4471
4472class UTC(datetime.tzinfo):
4473    def utcoffset(self, dt):
4474        return ZERO
4475
4476    dst = utcoffset
4477
4478    def tzname(self, dt):
4479        return 'UTC'
4480
4481utc = UTC()
4482
4483class AssertErrorMessage:
4484
4485    def assert_error_message(self, exception, message, *args, **kwargs):
4486        try:
4487            self.assertRaises((), *args, **kwargs)
4488        except exception as e:
4489            self.assertEqual(message, str(e))
4490
4491class FormatterTest(unittest.TestCase, AssertErrorMessage):
4492    def setUp(self):
4493        self.common = {
4494            'name': 'formatter.test',
4495            'level': logging.DEBUG,
4496            'pathname': os.path.join('path', 'to', 'dummy.ext'),
4497            'lineno': 42,
4498            'exc_info': None,
4499            'func': None,
4500            'msg': 'Message with %d %s',
4501            'args': (2, 'placeholders'),
4502        }
4503        self.variants = {
4504            'custom': {
4505                'custom': 1234
4506            }
4507        }
4508
4509    def get_record(self, name=None):
4510        result = dict(self.common)
4511        if name is not None:
4512            result.update(self.variants[name])
4513        return logging.makeLogRecord(result)
4514
4515    def test_percent(self):
4516        # Test %-formatting
4517        r = self.get_record()
4518        f = logging.Formatter('${%(message)s}')
4519        self.assertEqual(f.format(r), '${Message with 2 placeholders}')
4520        f = logging.Formatter('%(random)s')
4521        self.assertRaises(ValueError, f.format, r)
4522        self.assertFalse(f.usesTime())
4523        f = logging.Formatter('%(asctime)s')
4524        self.assertTrue(f.usesTime())
4525        f = logging.Formatter('%(asctime)-15s')
4526        self.assertTrue(f.usesTime())
4527        f = logging.Formatter('%(asctime)#15s')
4528        self.assertTrue(f.usesTime())
4529
4530    def test_braces(self):
4531        # Test {}-formatting
4532        r = self.get_record()
4533        f = logging.Formatter('$%{message}%$', style='{')
4534        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
4535        f = logging.Formatter('{random}', style='{')
4536        self.assertRaises(ValueError, f.format, r)
4537        f = logging.Formatter("{message}", style='{')
4538        self.assertFalse(f.usesTime())
4539        f = logging.Formatter('{asctime}', style='{')
4540        self.assertTrue(f.usesTime())
4541        f = logging.Formatter('{asctime!s:15}', style='{')
4542        self.assertTrue(f.usesTime())
4543        f = logging.Formatter('{asctime:15}', style='{')
4544        self.assertTrue(f.usesTime())
4545
4546    def test_dollars(self):
4547        # Test $-formatting
4548        r = self.get_record()
4549        f = logging.Formatter('${message}', style='$')
4550        self.assertEqual(f.format(r), 'Message with 2 placeholders')
4551        f = logging.Formatter('$message', style='$')
4552        self.assertEqual(f.format(r), 'Message with 2 placeholders')
4553        f = logging.Formatter('$$%${message}%$$', style='$')
4554        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
4555        f = logging.Formatter('${random}', style='$')
4556        self.assertRaises(ValueError, f.format, r)
4557        self.assertFalse(f.usesTime())
4558        f = logging.Formatter('${asctime}', style='$')
4559        self.assertTrue(f.usesTime())
4560        f = logging.Formatter('$asctime', style='$')
4561        self.assertTrue(f.usesTime())
4562        f = logging.Formatter('${message}', style='$')
4563        self.assertFalse(f.usesTime())
4564        f = logging.Formatter('${asctime}--', style='$')
4565        self.assertTrue(f.usesTime())
4566
4567    def test_format_validate(self):
4568        # Check correct formatting
4569        # Percentage style
4570        f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
4571        self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
4572        f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
4573        self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
4574        f = logging.Formatter("%(process)#+027.23X")
4575        self.assertEqual(f._fmt, "%(process)#+027.23X")
4576        f = logging.Formatter("%(foo)#.*g")
4577        self.assertEqual(f._fmt, "%(foo)#.*g")
4578
4579        # StrFormat Style
4580        f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{")
4581        self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}")
4582        f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{")
4583        self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}")
4584        f = logging.Formatter("{customfield!s:#<30}", style="{")
4585        self.assertEqual(f._fmt, "{customfield!s:#<30}")
4586        f = logging.Formatter("{message!r}", style="{")
4587        self.assertEqual(f._fmt, "{message!r}")
4588        f = logging.Formatter("{message!s}", style="{")
4589        self.assertEqual(f._fmt, "{message!s}")
4590        f = logging.Formatter("{message!a}", style="{")
4591        self.assertEqual(f._fmt, "{message!a}")
4592        f = logging.Formatter("{process!r:4.2}", style="{")
4593        self.assertEqual(f._fmt, "{process!r:4.2}")
4594        f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{")
4595        self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}")
4596        f = logging.Formatter("{process!s:{w},.{p}}", style="{")
4597        self.assertEqual(f._fmt, "{process!s:{w},.{p}}")
4598        f = logging.Formatter("{foo:12.{p}}", style="{")
4599        self.assertEqual(f._fmt, "{foo:12.{p}}")
4600        f = logging.Formatter("{foo:{w}.6}", style="{")
4601        self.assertEqual(f._fmt, "{foo:{w}.6}")
4602        f = logging.Formatter("{foo[0].bar[1].baz}", style="{")
4603        self.assertEqual(f._fmt, "{foo[0].bar[1].baz}")
4604        f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{")
4605        self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}")
4606        f = logging.Formatter("{12[k1].bar[k2].baz}", style="{")
4607        self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}")
4608
4609        # Dollar style
4610        f = logging.Formatter("${asctime} - $message", style="$")
4611        self.assertEqual(f._fmt, "${asctime} - $message")
4612        f = logging.Formatter("$bar $$", style="$")
4613        self.assertEqual(f._fmt, "$bar $$")
4614        f = logging.Formatter("$bar $$$$", style="$")
4615        self.assertEqual(f._fmt, "$bar $$$$")  # this would print two $($$)
4616
4617        # Testing when ValueError being raised from incorrect format
4618        # Percentage Style
4619        self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z")
4620        self.assertRaises(ValueError, logging.Formatter, "%(asctime)b")
4621        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*")
4622        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s")
4623        self.assertRaises(ValueError, logging.Formatter, "%(asctime)_")
4624        self.assertRaises(ValueError, logging.Formatter, '{asctime}')
4625        self.assertRaises(ValueError, logging.Formatter, '${message}')
4626        self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f')  # with both * and decimal number as precision
4627        self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f')
4628
4629        # StrFormat Style
4630        # Testing failure for '-' in field name
4631        self.assert_error_message(
4632            ValueError,
4633            "invalid format: invalid field name/expression: 'name-thing'",
4634            logging.Formatter, "{name-thing}", style="{"
4635        )
4636        # Testing failure for style mismatch
4637        self.assert_error_message(
4638            ValueError,
4639            "invalid format: no fields",
4640            logging.Formatter, '%(asctime)s', style='{'
4641        )
4642        # Testing failure for invalid conversion
4643        self.assert_error_message(
4644            ValueError,
4645            "invalid conversion: 'Z'"
4646        )
4647        self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{')
4648        self.assert_error_message(
4649            ValueError,
4650            "invalid format: expected ':' after conversion specifier",
4651            logging.Formatter, '{asctime!aa:15}', style='{'
4652        )
4653        # Testing failure for invalid spec
4654        self.assert_error_message(
4655            ValueError,
4656            "invalid format: bad specifier: '.2ff'",
4657            logging.Formatter, '{process:.2ff}', style='{'
4658        )
4659        self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{')
4660        self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{')
4661        self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{')
4662        self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{')
4663        # Testing failure for mismatch braces
4664        self.assert_error_message(
4665            ValueError,
4666            "invalid format: expected '}' before end of string",
4667            logging.Formatter, '{process', style='{'
4668        )
4669        self.assert_error_message(
4670            ValueError,
4671            "invalid format: Single '}' encountered in format string",
4672            logging.Formatter, 'process}', style='{'
4673        )
4674        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{')
4675        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{')
4676        self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{')
4677        self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{')
4678        self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{')
4679        self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{')
4680        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{')
4681        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{')
4682        self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{')
4683
4684        # Dollar style
4685        # Testing failure for mismatch bare $
4686        self.assert_error_message(
4687            ValueError,
4688            "invalid format: bare \'$\' not allowed",
4689            logging.Formatter, '$bar $$$', style='$'
4690        )
4691        self.assert_error_message(
4692            ValueError,
4693            "invalid format: bare \'$\' not allowed",
4694            logging.Formatter, 'bar $', style='$'
4695        )
4696        self.assert_error_message(
4697            ValueError,
4698            "invalid format: bare \'$\' not allowed",
4699            logging.Formatter, 'foo $.', style='$'
4700        )
4701        # Testing failure for mismatch style
4702        self.assert_error_message(
4703            ValueError,
4704            "invalid format: no fields",
4705            logging.Formatter, '{asctime}', style='$'
4706        )
4707        self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$')
4708
4709        # Testing failure for incorrect fields
4710        self.assert_error_message(
4711            ValueError,
4712            "invalid format: no fields",
4713            logging.Formatter, 'foo', style='$'
4714        )
4715        self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$')
4716
4717    def test_defaults_parameter(self):
4718        fmts = ['%(custom)s %(message)s', '{custom} {message}', '$custom $message']
4719        styles = ['%', '{', '$']
4720        for fmt, style in zip(fmts, styles):
4721            f = logging.Formatter(fmt, style=style, defaults={'custom': 'Default'})
4722            r = self.get_record()
4723            self.assertEqual(f.format(r), 'Default Message with 2 placeholders')
4724            r = self.get_record("custom")
4725            self.assertEqual(f.format(r), '1234 Message with 2 placeholders')
4726
4727            # Without default
4728            f = logging.Formatter(fmt, style=style)
4729            r = self.get_record()
4730            self.assertRaises(ValueError, f.format, r)
4731
4732            # Non-existing default is ignored
4733            f = logging.Formatter(fmt, style=style, defaults={'Non-existing': 'Default'})
4734            r = self.get_record("custom")
4735            self.assertEqual(f.format(r), '1234 Message with 2 placeholders')
4736
4737    def test_invalid_style(self):
4738        self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
4739
4740    def test_time(self):
4741        r = self.get_record()
4742        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
4743        # We use None to indicate we want the local timezone
4744        # We're essentially converting a UTC time to local time
4745        r.created = time.mktime(dt.astimezone(None).timetuple())
4746        r.msecs = 123
4747        f = logging.Formatter('%(asctime)s %(message)s')
4748        f.converter = time.gmtime
4749        self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
4750        self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
4751        f.format(r)
4752        self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
4753
4754    def test_default_msec_format_none(self):
4755        class NoMsecFormatter(logging.Formatter):
4756            default_msec_format = None
4757            default_time_format = '%d/%m/%Y %H:%M:%S'
4758
4759        r = self.get_record()
4760        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 123, utc)
4761        r.created = time.mktime(dt.astimezone(None).timetuple())
4762        f = NoMsecFormatter()
4763        f.converter = time.gmtime
4764        self.assertEqual(f.formatTime(r), '21/04/1993 08:03:00')
4765
4766    def test_issue_89047(self):
4767        f = logging.Formatter(fmt='{asctime}.{msecs:03.0f} {message}', style='{', datefmt="%Y-%m-%d %H:%M:%S")
4768        for i in range(2500):
4769            time.sleep(0.0004)
4770            r = logging.makeLogRecord({'msg': 'Message %d' % (i + 1)})
4771            s = f.format(r)
4772            self.assertNotIn('.1000', s)
4773
4774    def test_msecs_has_no_floating_point_precision_loss(self):
4775        # See issue gh-102402
4776        tests = (
4777            # time_ns is approx. 2023-03-04 04:25:20 UTC
4778            # (time_ns, expected_msecs_value)
4779            (1_677_902_297_100_000_000, 100.0),  # exactly 100ms
4780            (1_677_903_920_999_998_503, 999.0),  # check truncating doesn't round
4781            (1_677_903_920_000_998_503, 0.0),  # check truncating doesn't round
4782            (1_677_903_920_999_999_900, 0.0), # check rounding up
4783        )
4784        for ns, want in tests:
4785            with patch('time.time_ns') as patched_ns:
4786                patched_ns.return_value = ns
4787                record = logging.makeLogRecord({'msg': 'test'})
4788            with self.subTest(ns):
4789                self.assertEqual(record.msecs, want)
4790                self.assertEqual(record.created, ns / 1e9)
4791                self.assertAlmostEqual(record.created - int(record.created),
4792                                       record.msecs / 1e3,
4793                                       delta=1e-3)
4794
4795    def test_relativeCreated_has_higher_precision(self):
4796        # See issue gh-102402.
4797        # Run the code in the subprocess, because the time module should
4798        # be patched before the first import of the logging package.
4799        # Temporary unloading and re-importing the logging package has
4800        # side effects (including registering the atexit callback and
4801        # references leak).
4802        start_ns = 1_677_903_920_000_998_503  # approx. 2023-03-04 04:25:20 UTC
4803        offsets_ns = (200, 500, 12_354, 99_999, 1_677_903_456_999_123_456)
4804        code = textwrap.dedent(f"""
4805            start_ns = {start_ns!r}
4806            offsets_ns = {offsets_ns!r}
4807            start_monotonic_ns = start_ns - 1
4808
4809            import time
4810            # Only time.time_ns needs to be patched for the current
4811            # implementation, but patch also other functions to make
4812            # the test less implementation depending.
4813            old_time_ns = time.time_ns
4814            old_time = time.time
4815            old_monotonic_ns = time.monotonic_ns
4816            old_monotonic = time.monotonic
4817            time_ns_result = start_ns
4818            time.time_ns = lambda: time_ns_result
4819            time.time = lambda: time.time_ns()/1e9
4820            time.monotonic_ns = lambda: time_ns_result - start_monotonic_ns
4821            time.monotonic = lambda: time.monotonic_ns()/1e9
4822            try:
4823                import logging
4824
4825                for offset_ns in offsets_ns:
4826                    # mock for log record creation
4827                    time_ns_result = start_ns + offset_ns
4828                    record = logging.makeLogRecord({{'msg': 'test'}})
4829                    print(record.created, record.relativeCreated)
4830            finally:
4831                time.time_ns = old_time_ns
4832                time.time = old_time
4833                time.monotonic_ns = old_monotonic_ns
4834                time.monotonic = old_monotonic
4835        """)
4836        rc, out, err = assert_python_ok("-c", code)
4837        out = out.decode()
4838        for offset_ns, line in zip(offsets_ns, out.splitlines(), strict=True):
4839            with self.subTest(offset_ns=offset_ns):
4840                created, relativeCreated = map(float, line.split())
4841                self.assertAlmostEqual(created, (start_ns + offset_ns) / 1e9, places=6)
4842                # After PR gh-102412, precision (places) increases from 3 to 7
4843                self.assertAlmostEqual(relativeCreated, offset_ns / 1e6, places=7)
4844
4845
4846class TestBufferingFormatter(logging.BufferingFormatter):
4847    def formatHeader(self, records):
4848        return '[(%d)' % len(records)
4849
4850    def formatFooter(self, records):
4851        return '(%d)]' % len(records)
4852
4853class BufferingFormatterTest(unittest.TestCase):
4854    def setUp(self):
4855        self.records = [
4856            logging.makeLogRecord({'msg': 'one'}),
4857            logging.makeLogRecord({'msg': 'two'}),
4858        ]
4859
4860    def test_default(self):
4861        f = logging.BufferingFormatter()
4862        self.assertEqual('', f.format([]))
4863        self.assertEqual('onetwo', f.format(self.records))
4864
4865    def test_custom(self):
4866        f = TestBufferingFormatter()
4867        self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
4868        lf = logging.Formatter('<%(message)s>')
4869        f = TestBufferingFormatter(lf)
4870        self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
4871
4872class ExceptionTest(BaseTest):
4873    def test_formatting(self):
4874        r = self.root_logger
4875        h = RecordingHandler()
4876        r.addHandler(h)
4877        try:
4878            raise RuntimeError('deliberate mistake')
4879        except:
4880            logging.exception('failed', stack_info=True)
4881        r.removeHandler(h)
4882        h.close()
4883        r = h.records[0]
4884        self.assertTrue(r.exc_text.startswith('Traceback (most recent '
4885                                              'call last):\n'))
4886        self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
4887                                            'deliberate mistake'))
4888        self.assertTrue(r.stack_info.startswith('Stack (most recent '
4889                                              'call last):\n'))
4890        self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
4891                                            'stack_info=True)'))
4892
4893
4894class LastResortTest(BaseTest):
4895    def test_last_resort(self):
4896        # Test the last resort handler
4897        root = self.root_logger
4898        root.removeHandler(self.root_hdlr)
4899        old_lastresort = logging.lastResort
4900        old_raise_exceptions = logging.raiseExceptions
4901
4902        try:
4903            with support.captured_stderr() as stderr:
4904                root.debug('This should not appear')
4905                self.assertEqual(stderr.getvalue(), '')
4906                root.warning('Final chance!')
4907                self.assertEqual(stderr.getvalue(), 'Final chance!\n')
4908
4909            # No handlers and no last resort, so 'No handlers' message
4910            logging.lastResort = None
4911            with support.captured_stderr() as stderr:
4912                root.warning('Final chance!')
4913                msg = 'No handlers could be found for logger "root"\n'
4914                self.assertEqual(stderr.getvalue(), msg)
4915
4916            # 'No handlers' message only printed once
4917            with support.captured_stderr() as stderr:
4918                root.warning('Final chance!')
4919                self.assertEqual(stderr.getvalue(), '')
4920
4921            # If raiseExceptions is False, no message is printed
4922            root.manager.emittedNoHandlerWarning = False
4923            logging.raiseExceptions = False
4924            with support.captured_stderr() as stderr:
4925                root.warning('Final chance!')
4926                self.assertEqual(stderr.getvalue(), '')
4927        finally:
4928            root.addHandler(self.root_hdlr)
4929            logging.lastResort = old_lastresort
4930            logging.raiseExceptions = old_raise_exceptions
4931
4932
4933class FakeHandler:
4934
4935    def __init__(self, identifier, called):
4936        for method in ('acquire', 'flush', 'close', 'release'):
4937            setattr(self, method, self.record_call(identifier, method, called))
4938
4939    def record_call(self, identifier, method_name, called):
4940        def inner():
4941            called.append('{} - {}'.format(identifier, method_name))
4942        return inner
4943
4944
4945class RecordingHandler(logging.NullHandler):
4946
4947    def __init__(self, *args, **kwargs):
4948        super(RecordingHandler, self).__init__(*args, **kwargs)
4949        self.records = []
4950
4951    def handle(self, record):
4952        """Keep track of all the emitted records."""
4953        self.records.append(record)
4954
4955
4956class ShutdownTest(BaseTest):
4957
4958    """Test suite for the shutdown method."""
4959
4960    def setUp(self):
4961        super(ShutdownTest, self).setUp()
4962        self.called = []
4963
4964        raise_exceptions = logging.raiseExceptions
4965        self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
4966
4967    def raise_error(self, error):
4968        def inner():
4969            raise error()
4970        return inner
4971
4972    def test_no_failure(self):
4973        # create some fake handlers
4974        handler0 = FakeHandler(0, self.called)
4975        handler1 = FakeHandler(1, self.called)
4976        handler2 = FakeHandler(2, self.called)
4977
4978        # create live weakref to those handlers
4979        handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
4980
4981        logging.shutdown(handlerList=list(handlers))
4982
4983        expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
4984                    '1 - acquire', '1 - flush', '1 - close', '1 - release',
4985                    '0 - acquire', '0 - flush', '0 - close', '0 - release']
4986        self.assertEqual(expected, self.called)
4987
4988    def _test_with_failure_in_method(self, method, error):
4989        handler = FakeHandler(0, self.called)
4990        setattr(handler, method, self.raise_error(error))
4991        handlers = [logging.weakref.ref(handler)]
4992
4993        logging.shutdown(handlerList=list(handlers))
4994
4995        self.assertEqual('0 - release', self.called[-1])
4996
4997    def test_with_ioerror_in_acquire(self):
4998        self._test_with_failure_in_method('acquire', OSError)
4999
5000    def test_with_ioerror_in_flush(self):
5001        self._test_with_failure_in_method('flush', OSError)
5002
5003    def test_with_ioerror_in_close(self):
5004        self._test_with_failure_in_method('close', OSError)
5005
5006    def test_with_valueerror_in_acquire(self):
5007        self._test_with_failure_in_method('acquire', ValueError)
5008
5009    def test_with_valueerror_in_flush(self):
5010        self._test_with_failure_in_method('flush', ValueError)
5011
5012    def test_with_valueerror_in_close(self):
5013        self._test_with_failure_in_method('close', ValueError)
5014
5015    def test_with_other_error_in_acquire_without_raise(self):
5016        logging.raiseExceptions = False
5017        self._test_with_failure_in_method('acquire', IndexError)
5018
5019    def test_with_other_error_in_flush_without_raise(self):
5020        logging.raiseExceptions = False
5021        self._test_with_failure_in_method('flush', IndexError)
5022
5023    def test_with_other_error_in_close_without_raise(self):
5024        logging.raiseExceptions = False
5025        self._test_with_failure_in_method('close', IndexError)
5026
5027    def test_with_other_error_in_acquire_with_raise(self):
5028        logging.raiseExceptions = True
5029        self.assertRaises(IndexError, self._test_with_failure_in_method,
5030                          'acquire', IndexError)
5031
5032    def test_with_other_error_in_flush_with_raise(self):
5033        logging.raiseExceptions = True
5034        self.assertRaises(IndexError, self._test_with_failure_in_method,
5035                          'flush', IndexError)
5036
5037    def test_with_other_error_in_close_with_raise(self):
5038        logging.raiseExceptions = True
5039        self.assertRaises(IndexError, self._test_with_failure_in_method,
5040                          'close', IndexError)
5041
5042
5043class ModuleLevelMiscTest(BaseTest):
5044
5045    """Test suite for some module level methods."""
5046
5047    def test_disable(self):
5048        old_disable = logging.root.manager.disable
5049        # confirm our assumptions are correct
5050        self.assertEqual(old_disable, 0)
5051        self.addCleanup(logging.disable, old_disable)
5052
5053        logging.disable(83)
5054        self.assertEqual(logging.root.manager.disable, 83)
5055
5056        self.assertRaises(ValueError, logging.disable, "doesnotexists")
5057
5058        class _NotAnIntOrString:
5059            pass
5060
5061        self.assertRaises(TypeError, logging.disable, _NotAnIntOrString())
5062
5063        logging.disable("WARN")
5064
5065        # test the default value introduced in 3.7
5066        # (Issue #28524)
5067        logging.disable()
5068        self.assertEqual(logging.root.manager.disable, logging.CRITICAL)
5069
5070    def _test_log(self, method, level=None):
5071        called = []
5072        support.patch(self, logging, 'basicConfig',
5073                      lambda *a, **kw: called.append((a, kw)))
5074
5075        recording = RecordingHandler()
5076        logging.root.addHandler(recording)
5077
5078        log_method = getattr(logging, method)
5079        if level is not None:
5080            log_method(level, "test me: %r", recording)
5081        else:
5082            log_method("test me: %r", recording)
5083
5084        self.assertEqual(len(recording.records), 1)
5085        record = recording.records[0]
5086        self.assertEqual(record.getMessage(), "test me: %r" % recording)
5087
5088        expected_level = level if level is not None else getattr(logging, method.upper())
5089        self.assertEqual(record.levelno, expected_level)
5090
5091        # basicConfig was not called!
5092        self.assertEqual(called, [])
5093
5094    def test_log(self):
5095        self._test_log('log', logging.ERROR)
5096
5097    def test_debug(self):
5098        self._test_log('debug')
5099
5100    def test_info(self):
5101        self._test_log('info')
5102
5103    def test_warning(self):
5104        self._test_log('warning')
5105
5106    def test_error(self):
5107        self._test_log('error')
5108
5109    def test_critical(self):
5110        self._test_log('critical')
5111
5112    def test_set_logger_class(self):
5113        self.assertRaises(TypeError, logging.setLoggerClass, object)
5114
5115        class MyLogger(logging.Logger):
5116            pass
5117
5118        logging.setLoggerClass(MyLogger)
5119        self.assertEqual(logging.getLoggerClass(), MyLogger)
5120
5121        logging.setLoggerClass(logging.Logger)
5122        self.assertEqual(logging.getLoggerClass(), logging.Logger)
5123
5124    def test_subclass_logger_cache(self):
5125        # bpo-37258
5126        message = []
5127
5128        class MyLogger(logging.getLoggerClass()):
5129            def __init__(self, name='MyLogger', level=logging.NOTSET):
5130                super().__init__(name, level)
5131                message.append('initialized')
5132
5133        logging.setLoggerClass(MyLogger)
5134        logger = logging.getLogger('just_some_logger')
5135        self.assertEqual(message, ['initialized'])
5136        stream = io.StringIO()
5137        h = logging.StreamHandler(stream)
5138        logger.addHandler(h)
5139        try:
5140            logger.setLevel(logging.DEBUG)
5141            logger.debug("hello")
5142            self.assertEqual(stream.getvalue().strip(), "hello")
5143
5144            stream.truncate(0)
5145            stream.seek(0)
5146
5147            logger.setLevel(logging.INFO)
5148            logger.debug("hello")
5149            self.assertEqual(stream.getvalue(), "")
5150        finally:
5151            logger.removeHandler(h)
5152            h.close()
5153            logging.setLoggerClass(logging.Logger)
5154
5155    def test_logging_at_shutdown(self):
5156        # bpo-20037: Doing text I/O late at interpreter shutdown must not crash
5157        code = textwrap.dedent("""
5158            import logging
5159
5160            class A:
5161                def __del__(self):
5162                    try:
5163                        raise ValueError("some error")
5164                    except Exception:
5165                        logging.exception("exception in __del__")
5166
5167            a = A()
5168        """)
5169        rc, out, err = assert_python_ok("-c", code)
5170        err = err.decode()
5171        self.assertIn("exception in __del__", err)
5172        self.assertIn("ValueError: some error", err)
5173
5174    def test_logging_at_shutdown_open(self):
5175        # bpo-26789: FileHandler keeps a reference to the builtin open()
5176        # function to be able to open or reopen the file during Python
5177        # finalization.
5178        filename = os_helper.TESTFN
5179        self.addCleanup(os_helper.unlink, filename)
5180
5181        code = textwrap.dedent(f"""
5182            import builtins
5183            import logging
5184
5185            class A:
5186                def __del__(self):
5187                    logging.error("log in __del__")
5188
5189            # basicConfig() opens the file, but logging.shutdown() closes
5190            # it at Python exit. When A.__del__() is called,
5191            # FileHandler._open() must be called again to re-open the file.
5192            logging.basicConfig(filename={filename!r}, encoding="utf-8")
5193
5194            a = A()
5195
5196            # Simulate the Python finalization which removes the builtin
5197            # open() function.
5198            del builtins.open
5199        """)
5200        assert_python_ok("-c", code)
5201
5202        with open(filename, encoding="utf-8") as fp:
5203            self.assertEqual(fp.read().rstrip(), "ERROR:root:log in __del__")
5204
5205    def test_recursion_error(self):
5206        # Issue 36272
5207        code = textwrap.dedent("""
5208            import logging
5209
5210            def rec():
5211                logging.error("foo")
5212                rec()
5213
5214            rec()
5215        """)
5216        rc, out, err = assert_python_failure("-c", code)
5217        err = err.decode()
5218        self.assertNotIn("Cannot recover from stack overflow.", err)
5219        self.assertEqual(rc, 1)
5220
5221    def test_get_level_names_mapping(self):
5222        mapping = logging.getLevelNamesMapping()
5223        self.assertEqual(logging._nameToLevel, mapping)  # value is equivalent
5224        self.assertIsNot(logging._nameToLevel, mapping)  # but not the internal data
5225        new_mapping = logging.getLevelNamesMapping()     # another call -> another copy
5226        self.assertIsNot(mapping, new_mapping)           # verify not the same object as before
5227        self.assertEqual(mapping, new_mapping)           # but equivalent in value
5228
5229
5230class LogRecordTest(BaseTest):
5231    def test_str_rep(self):
5232        r = logging.makeLogRecord({})
5233        s = str(r)
5234        self.assertTrue(s.startswith('<LogRecord: '))
5235        self.assertTrue(s.endswith('>'))
5236
5237    def test_dict_arg(self):
5238        h = RecordingHandler()
5239        r = logging.getLogger()
5240        r.addHandler(h)
5241        d = {'less' : 'more' }
5242        logging.warning('less is %(less)s', d)
5243        self.assertIs(h.records[0].args, d)
5244        self.assertEqual(h.records[0].message, 'less is more')
5245        r.removeHandler(h)
5246        h.close()
5247
5248    @staticmethod # pickled as target of child process in the following test
5249    def _extract_logrecord_process_name(key, logMultiprocessing, conn=None):
5250        prev_logMultiprocessing = logging.logMultiprocessing
5251        logging.logMultiprocessing = logMultiprocessing
5252        try:
5253            import multiprocessing as mp
5254            name = mp.current_process().name
5255
5256            r1 = logging.makeLogRecord({'msg': f'msg1_{key}'})
5257
5258            # https://bugs.python.org/issue45128
5259            with support.swap_item(sys.modules, 'multiprocessing', None):
5260                r2 = logging.makeLogRecord({'msg': f'msg2_{key}'})
5261
5262            results = {'processName'  : name,
5263                       'r1.processName': r1.processName,
5264                       'r2.processName': r2.processName,
5265                      }
5266        finally:
5267            logging.logMultiprocessing = prev_logMultiprocessing
5268        if conn:
5269            conn.send(results)
5270        else:
5271            return results
5272
5273    @skip_if_tsan_fork
5274    def test_multiprocessing(self):
5275        support.skip_if_broken_multiprocessing_synchronize()
5276        multiprocessing_imported = 'multiprocessing' in sys.modules
5277        try:
5278            # logMultiprocessing is True by default
5279            self.assertEqual(logging.logMultiprocessing, True)
5280
5281            LOG_MULTI_PROCESSING = True
5282            # When logMultiprocessing == True:
5283            # In the main process processName = 'MainProcess'
5284            r = logging.makeLogRecord({})
5285            self.assertEqual(r.processName, 'MainProcess')
5286
5287            results = self._extract_logrecord_process_name(1, LOG_MULTI_PROCESSING)
5288            self.assertEqual('MainProcess', results['processName'])
5289            self.assertEqual('MainProcess', results['r1.processName'])
5290            self.assertEqual('MainProcess', results['r2.processName'])
5291
5292            # In other processes, processName is correct when multiprocessing in imported,
5293            # but it is (incorrectly) defaulted to 'MainProcess' otherwise (bpo-38762).
5294            import multiprocessing
5295            parent_conn, child_conn = multiprocessing.Pipe()
5296            p = multiprocessing.Process(
5297                target=self._extract_logrecord_process_name,
5298                args=(2, LOG_MULTI_PROCESSING, child_conn,)
5299            )
5300            p.start()
5301            results = parent_conn.recv()
5302            self.assertNotEqual('MainProcess', results['processName'])
5303            self.assertEqual(results['processName'], results['r1.processName'])
5304            self.assertEqual('MainProcess', results['r2.processName'])
5305            p.join()
5306
5307        finally:
5308            if multiprocessing_imported:
5309                import multiprocessing
5310
5311    def test_optional(self):
5312        NONE = self.assertIsNone
5313        NOT_NONE = self.assertIsNotNone
5314
5315        r = logging.makeLogRecord({})
5316        NOT_NONE(r.thread)
5317        NOT_NONE(r.threadName)
5318        NOT_NONE(r.process)
5319        NOT_NONE(r.processName)
5320        NONE(r.taskName)
5321        log_threads = logging.logThreads
5322        log_processes = logging.logProcesses
5323        log_multiprocessing = logging.logMultiprocessing
5324        log_asyncio_tasks = logging.logAsyncioTasks
5325        try:
5326            logging.logThreads = False
5327            logging.logProcesses = False
5328            logging.logMultiprocessing = False
5329            logging.logAsyncioTasks = False
5330            r = logging.makeLogRecord({})
5331
5332            NONE(r.thread)
5333            NONE(r.threadName)
5334            NONE(r.process)
5335            NONE(r.processName)
5336            NONE(r.taskName)
5337        finally:
5338            logging.logThreads = log_threads
5339            logging.logProcesses = log_processes
5340            logging.logMultiprocessing = log_multiprocessing
5341            logging.logAsyncioTasks = log_asyncio_tasks
5342
5343    async def _make_record_async(self, assertion):
5344        r = logging.makeLogRecord({})
5345        assertion(r.taskName)
5346
5347    @support.requires_working_socket()
5348    def test_taskName_with_asyncio_imported(self):
5349        try:
5350            make_record = self._make_record_async
5351            with asyncio.Runner() as runner:
5352                logging.logAsyncioTasks = True
5353                runner.run(make_record(self.assertIsNotNone))
5354                logging.logAsyncioTasks = False
5355                runner.run(make_record(self.assertIsNone))
5356        finally:
5357            asyncio.set_event_loop_policy(None)
5358
5359    @support.requires_working_socket()
5360    def test_taskName_without_asyncio_imported(self):
5361        try:
5362            make_record = self._make_record_async
5363            with asyncio.Runner() as runner, support.swap_item(sys.modules, 'asyncio', None):
5364                logging.logAsyncioTasks = True
5365                runner.run(make_record(self.assertIsNone))
5366                logging.logAsyncioTasks = False
5367                runner.run(make_record(self.assertIsNone))
5368        finally:
5369            asyncio.set_event_loop_policy(None)
5370
5371
5372class BasicConfigTest(unittest.TestCase):
5373
5374    """Test suite for logging.basicConfig."""
5375
5376    def setUp(self):
5377        super(BasicConfigTest, self).setUp()
5378        self.handlers = logging.root.handlers
5379        self.saved_handlers = logging._handlers.copy()
5380        self.saved_handler_list = logging._handlerList[:]
5381        self.original_logging_level = logging.root.level
5382        self.addCleanup(self.cleanup)
5383        logging.root.handlers = []
5384
5385    def tearDown(self):
5386        for h in logging.root.handlers[:]:
5387            logging.root.removeHandler(h)
5388            h.close()
5389        super(BasicConfigTest, self).tearDown()
5390
5391    def cleanup(self):
5392        setattr(logging.root, 'handlers', self.handlers)
5393        logging._handlers.clear()
5394        logging._handlers.update(self.saved_handlers)
5395        logging._handlerList[:] = self.saved_handler_list
5396        logging.root.setLevel(self.original_logging_level)
5397
5398    def test_no_kwargs(self):
5399        logging.basicConfig()
5400
5401        # handler defaults to a StreamHandler to sys.stderr
5402        self.assertEqual(len(logging.root.handlers), 1)
5403        handler = logging.root.handlers[0]
5404        self.assertIsInstance(handler, logging.StreamHandler)
5405        self.assertEqual(handler.stream, sys.stderr)
5406
5407        formatter = handler.formatter
5408        # format defaults to logging.BASIC_FORMAT
5409        self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
5410        # datefmt defaults to None
5411        self.assertIsNone(formatter.datefmt)
5412        # style defaults to %
5413        self.assertIsInstance(formatter._style, logging.PercentStyle)
5414
5415        # level is not explicitly set
5416        self.assertEqual(logging.root.level, self.original_logging_level)
5417
5418    def test_strformatstyle(self):
5419        with support.captured_stdout() as output:
5420            logging.basicConfig(stream=sys.stdout, style="{")
5421            logging.error("Log an error")
5422            sys.stdout.seek(0)
5423            self.assertEqual(output.getvalue().strip(),
5424                "ERROR:root:Log an error")
5425
5426    def test_stringtemplatestyle(self):
5427        with support.captured_stdout() as output:
5428            logging.basicConfig(stream=sys.stdout, style="$")
5429            logging.error("Log an error")
5430            sys.stdout.seek(0)
5431            self.assertEqual(output.getvalue().strip(),
5432                "ERROR:root:Log an error")
5433
5434    def test_filename(self):
5435
5436        def cleanup(h1, h2, fn):
5437            h1.close()
5438            h2.close()
5439            os.remove(fn)
5440
5441        logging.basicConfig(filename='test.log', encoding='utf-8')
5442
5443        self.assertEqual(len(logging.root.handlers), 1)
5444        handler = logging.root.handlers[0]
5445        self.assertIsInstance(handler, logging.FileHandler)
5446
5447        expected = logging.FileHandler('test.log', 'a', encoding='utf-8')
5448        self.assertEqual(handler.stream.mode, expected.stream.mode)
5449        self.assertEqual(handler.stream.name, expected.stream.name)
5450        self.addCleanup(cleanup, handler, expected, 'test.log')
5451
5452    def test_filemode(self):
5453
5454        def cleanup(h1, h2, fn):
5455            h1.close()
5456            h2.close()
5457            os.remove(fn)
5458
5459        logging.basicConfig(filename='test.log', filemode='wb')
5460
5461        handler = logging.root.handlers[0]
5462        expected = logging.FileHandler('test.log', 'wb')
5463        self.assertEqual(handler.stream.mode, expected.stream.mode)
5464        self.addCleanup(cleanup, handler, expected, 'test.log')
5465
5466    def test_stream(self):
5467        stream = io.StringIO()
5468        self.addCleanup(stream.close)
5469        logging.basicConfig(stream=stream)
5470
5471        self.assertEqual(len(logging.root.handlers), 1)
5472        handler = logging.root.handlers[0]
5473        self.assertIsInstance(handler, logging.StreamHandler)
5474        self.assertEqual(handler.stream, stream)
5475
5476    def test_format(self):
5477        logging.basicConfig(format='%(asctime)s - %(message)s')
5478
5479        formatter = logging.root.handlers[0].formatter
5480        self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s')
5481
5482    def test_datefmt(self):
5483        logging.basicConfig(datefmt='bar')
5484
5485        formatter = logging.root.handlers[0].formatter
5486        self.assertEqual(formatter.datefmt, 'bar')
5487
5488    def test_style(self):
5489        logging.basicConfig(style='$')
5490
5491        formatter = logging.root.handlers[0].formatter
5492        self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
5493
5494    def test_level(self):
5495        old_level = logging.root.level
5496        self.addCleanup(logging.root.setLevel, old_level)
5497
5498        logging.basicConfig(level=57)
5499        self.assertEqual(logging.root.level, 57)
5500        # Test that second call has no effect
5501        logging.basicConfig(level=58)
5502        self.assertEqual(logging.root.level, 57)
5503
5504    def test_incompatible(self):
5505        assertRaises = self.assertRaises
5506        handlers = [logging.StreamHandler()]
5507        stream = sys.stderr
5508        assertRaises(ValueError, logging.basicConfig, filename='test.log',
5509                                                      stream=stream)
5510        assertRaises(ValueError, logging.basicConfig, filename='test.log',
5511                                                      handlers=handlers)
5512        assertRaises(ValueError, logging.basicConfig, stream=stream,
5513                                                      handlers=handlers)
5514        # Issue 23207: test for invalid kwargs
5515        assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
5516        # Should pop both filename and filemode even if filename is None
5517        logging.basicConfig(filename=None, filemode='a')
5518
5519    def test_handlers(self):
5520        handlers = [
5521            logging.StreamHandler(),
5522            logging.StreamHandler(sys.stdout),
5523            logging.StreamHandler(),
5524        ]
5525        f = logging.Formatter()
5526        handlers[2].setFormatter(f)
5527        logging.basicConfig(handlers=handlers)
5528        self.assertIs(handlers[0], logging.root.handlers[0])
5529        self.assertIs(handlers[1], logging.root.handlers[1])
5530        self.assertIs(handlers[2], logging.root.handlers[2])
5531        self.assertIsNotNone(handlers[0].formatter)
5532        self.assertIsNotNone(handlers[1].formatter)
5533        self.assertIs(handlers[2].formatter, f)
5534        self.assertIs(handlers[0].formatter, handlers[1].formatter)
5535
5536    def test_force(self):
5537        old_string_io = io.StringIO()
5538        new_string_io = io.StringIO()
5539        old_handlers = [logging.StreamHandler(old_string_io)]
5540        new_handlers = [logging.StreamHandler(new_string_io)]
5541        logging.basicConfig(level=logging.WARNING, handlers=old_handlers)
5542        logging.warning('warn')
5543        logging.info('info')
5544        logging.debug('debug')
5545        self.assertEqual(len(logging.root.handlers), 1)
5546        logging.basicConfig(level=logging.INFO, handlers=new_handlers,
5547                            force=True)
5548        logging.warning('warn')
5549        logging.info('info')
5550        logging.debug('debug')
5551        self.assertEqual(len(logging.root.handlers), 1)
5552        self.assertEqual(old_string_io.getvalue().strip(),
5553                         'WARNING:root:warn')
5554        self.assertEqual(new_string_io.getvalue().strip(),
5555                         'WARNING:root:warn\nINFO:root:info')
5556
5557    def test_encoding(self):
5558        try:
5559            encoding = 'utf-8'
5560            logging.basicConfig(filename='test.log', encoding=encoding,
5561                                errors='strict',
5562                                format='%(message)s', level=logging.DEBUG)
5563
5564            self.assertEqual(len(logging.root.handlers), 1)
5565            handler = logging.root.handlers[0]
5566            self.assertIsInstance(handler, logging.FileHandler)
5567            self.assertEqual(handler.encoding, encoding)
5568            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
5569        finally:
5570            handler.close()
5571            with open('test.log', encoding='utf-8') as f:
5572                data = f.read().strip()
5573            os.remove('test.log')
5574            self.assertEqual(data,
5575                             'The Øresund Bridge joins Copenhagen to Malmö')
5576
5577    def test_encoding_errors(self):
5578        try:
5579            encoding = 'ascii'
5580            logging.basicConfig(filename='test.log', encoding=encoding,
5581                                errors='ignore',
5582                                format='%(message)s', level=logging.DEBUG)
5583
5584            self.assertEqual(len(logging.root.handlers), 1)
5585            handler = logging.root.handlers[0]
5586            self.assertIsInstance(handler, logging.FileHandler)
5587            self.assertEqual(handler.encoding, encoding)
5588            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
5589        finally:
5590            handler.close()
5591            with open('test.log', encoding='utf-8') as f:
5592                data = f.read().strip()
5593            os.remove('test.log')
5594            self.assertEqual(data, 'The resund Bridge joins Copenhagen to Malm')
5595
5596    def test_encoding_errors_default(self):
5597        try:
5598            encoding = 'ascii'
5599            logging.basicConfig(filename='test.log', encoding=encoding,
5600                                format='%(message)s', level=logging.DEBUG)
5601
5602            self.assertEqual(len(logging.root.handlers), 1)
5603            handler = logging.root.handlers[0]
5604            self.assertIsInstance(handler, logging.FileHandler)
5605            self.assertEqual(handler.encoding, encoding)
5606            self.assertEqual(handler.errors, 'backslashreplace')
5607            logging.debug('��: ☃️: The Øresund Bridge joins Copenhagen to Malmö')
5608        finally:
5609            handler.close()
5610            with open('test.log', encoding='utf-8') as f:
5611                data = f.read().strip()
5612            os.remove('test.log')
5613            self.assertEqual(data, r'\U0001f602: \u2603\ufe0f: The \xd8resund '
5614                                   r'Bridge joins Copenhagen to Malm\xf6')
5615
5616    def test_encoding_errors_none(self):
5617        # Specifying None should behave as 'strict'
5618        try:
5619            encoding = 'ascii'
5620            logging.basicConfig(filename='test.log', encoding=encoding,
5621                                errors=None,
5622                                format='%(message)s', level=logging.DEBUG)
5623
5624            self.assertEqual(len(logging.root.handlers), 1)
5625            handler = logging.root.handlers[0]
5626            self.assertIsInstance(handler, logging.FileHandler)
5627            self.assertEqual(handler.encoding, encoding)
5628            self.assertIsNone(handler.errors)
5629
5630            message = []
5631
5632            def dummy_handle_error(record):
5633                message.append(str(sys.exception()))
5634
5635            handler.handleError = dummy_handle_error
5636            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
5637            self.assertTrue(message)
5638            self.assertIn("'ascii' codec can't encode "
5639                          "character '\\xd8' in position 4:", message[0])
5640        finally:
5641            handler.close()
5642            with open('test.log', encoding='utf-8') as f:
5643                data = f.read().strip()
5644            os.remove('test.log')
5645            # didn't write anything due to the encoding error
5646            self.assertEqual(data, r'')
5647
5648    @support.requires_working_socket()
5649    def test_log_taskName(self):
5650        async def log_record():
5651            logging.warning('hello world')
5652
5653        handler = None
5654        log_filename = make_temp_file('.log', 'test-logging-taskname-')
5655        self.addCleanup(os.remove, log_filename)
5656        try:
5657            encoding = 'utf-8'
5658            logging.basicConfig(filename=log_filename, errors='strict',
5659                                encoding=encoding, level=logging.WARNING,
5660                                format='%(taskName)s - %(message)s')
5661
5662            self.assertEqual(len(logging.root.handlers), 1)
5663            handler = logging.root.handlers[0]
5664            self.assertIsInstance(handler, logging.FileHandler)
5665
5666            with asyncio.Runner(debug=True) as runner:
5667                logging.logAsyncioTasks = True
5668                runner.run(log_record())
5669            with open(log_filename, encoding='utf-8') as f:
5670                data = f.read().strip()
5671            self.assertRegex(data, r'Task-\d+ - hello world')
5672        finally:
5673            asyncio.set_event_loop_policy(None)
5674            if handler:
5675                handler.close()
5676
5677
5678    def _test_log(self, method, level=None):
5679        # logging.root has no handlers so basicConfig should be called
5680        called = []
5681
5682        old_basic_config = logging.basicConfig
5683        def my_basic_config(*a, **kw):
5684            old_basic_config()
5685            old_level = logging.root.level
5686            logging.root.setLevel(100)  # avoid having messages in stderr
5687            self.addCleanup(logging.root.setLevel, old_level)
5688            called.append((a, kw))
5689
5690        support.patch(self, logging, 'basicConfig', my_basic_config)
5691
5692        log_method = getattr(logging, method)
5693        if level is not None:
5694            log_method(level, "test me")
5695        else:
5696            log_method("test me")
5697
5698        # basicConfig was called with no arguments
5699        self.assertEqual(called, [((), {})])
5700
5701    def test_log(self):
5702        self._test_log('log', logging.WARNING)
5703
5704    def test_debug(self):
5705        self._test_log('debug')
5706
5707    def test_info(self):
5708        self._test_log('info')
5709
5710    def test_warning(self):
5711        self._test_log('warning')
5712
5713    def test_error(self):
5714        self._test_log('error')
5715
5716    def test_critical(self):
5717        self._test_log('critical')
5718
5719
5720class LoggerAdapterTest(unittest.TestCase):
5721    def setUp(self):
5722        super(LoggerAdapterTest, self).setUp()
5723        old_handler_list = logging._handlerList[:]
5724
5725        self.recording = RecordingHandler()
5726        self.logger = logging.root
5727        self.logger.addHandler(self.recording)
5728        self.addCleanup(self.logger.removeHandler, self.recording)
5729        self.addCleanup(self.recording.close)
5730
5731        def cleanup():
5732            logging._handlerList[:] = old_handler_list
5733
5734        self.addCleanup(cleanup)
5735        self.addCleanup(logging.shutdown)
5736        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
5737
5738    def test_exception(self):
5739        msg = 'testing exception: %r'
5740        exc = None
5741        try:
5742            1 / 0
5743        except ZeroDivisionError as e:
5744            exc = e
5745            self.adapter.exception(msg, self.recording)
5746
5747        self.assertEqual(len(self.recording.records), 1)
5748        record = self.recording.records[0]
5749        self.assertEqual(record.levelno, logging.ERROR)
5750        self.assertEqual(record.msg, msg)
5751        self.assertEqual(record.args, (self.recording,))
5752        self.assertEqual(record.exc_info,
5753                         (exc.__class__, exc, exc.__traceback__))
5754
5755    def test_exception_excinfo(self):
5756        try:
5757            1 / 0
5758        except ZeroDivisionError as e:
5759            exc = e
5760
5761        self.adapter.exception('exc_info test', exc_info=exc)
5762
5763        self.assertEqual(len(self.recording.records), 1)
5764        record = self.recording.records[0]
5765        self.assertEqual(record.exc_info,
5766                         (exc.__class__, exc, exc.__traceback__))
5767
5768    def test_critical(self):
5769        msg = 'critical test! %r'
5770        self.adapter.critical(msg, self.recording)
5771
5772        self.assertEqual(len(self.recording.records), 1)
5773        record = self.recording.records[0]
5774        self.assertEqual(record.levelno, logging.CRITICAL)
5775        self.assertEqual(record.msg, msg)
5776        self.assertEqual(record.args, (self.recording,))
5777        self.assertEqual(record.funcName, 'test_critical')
5778
5779    def test_is_enabled_for(self):
5780        old_disable = self.adapter.logger.manager.disable
5781        self.adapter.logger.manager.disable = 33
5782        self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
5783                        old_disable)
5784        self.assertFalse(self.adapter.isEnabledFor(32))
5785
5786    def test_has_handlers(self):
5787        self.assertTrue(self.adapter.hasHandlers())
5788
5789        for handler in self.logger.handlers:
5790            self.logger.removeHandler(handler)
5791
5792        self.assertFalse(self.logger.hasHandlers())
5793        self.assertFalse(self.adapter.hasHandlers())
5794
5795    def test_nested(self):
5796        msg = 'Adapters can be nested, yo.'
5797        adapter = PrefixAdapter(logger=self.logger, extra=None)
5798        adapter_adapter = PrefixAdapter(logger=adapter, extra=None)
5799        adapter_adapter.prefix = 'AdapterAdapter'
5800        self.assertEqual(repr(adapter), repr(adapter_adapter))
5801        adapter_adapter.log(logging.CRITICAL, msg, self.recording)
5802        self.assertEqual(len(self.recording.records), 1)
5803        record = self.recording.records[0]
5804        self.assertEqual(record.levelno, logging.CRITICAL)
5805        self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}")
5806        self.assertEqual(record.args, (self.recording,))
5807        self.assertEqual(record.funcName, 'test_nested')
5808        orig_manager = adapter_adapter.manager
5809        self.assertIs(adapter.manager, orig_manager)
5810        self.assertIs(self.logger.manager, orig_manager)
5811        temp_manager = object()
5812        try:
5813            adapter_adapter.manager = temp_manager
5814            self.assertIs(adapter_adapter.manager, temp_manager)
5815            self.assertIs(adapter.manager, temp_manager)
5816            self.assertIs(self.logger.manager, temp_manager)
5817        finally:
5818            adapter_adapter.manager = orig_manager
5819        self.assertIs(adapter_adapter.manager, orig_manager)
5820        self.assertIs(adapter.manager, orig_manager)
5821        self.assertIs(self.logger.manager, orig_manager)
5822
5823    def test_styled_adapter(self):
5824        # Test an example from the Cookbook.
5825        records = self.recording.records
5826        adapter = StyleAdapter(self.logger)
5827        adapter.warning('Hello, {}!', 'world')
5828        self.assertEqual(str(records[-1].msg), 'Hello, world!')
5829        self.assertEqual(records[-1].funcName, 'test_styled_adapter')
5830        adapter.log(logging.WARNING, 'Goodbye {}.', 'world')
5831        self.assertEqual(str(records[-1].msg), 'Goodbye world.')
5832        self.assertEqual(records[-1].funcName, 'test_styled_adapter')
5833
5834    def test_nested_styled_adapter(self):
5835        records = self.recording.records
5836        adapter = PrefixAdapter(self.logger)
5837        adapter.prefix = '{}'
5838        adapter2 = StyleAdapter(adapter)
5839        adapter2.warning('Hello, {}!', 'world')
5840        self.assertEqual(str(records[-1].msg), '{} Hello, world!')
5841        self.assertEqual(records[-1].funcName, 'test_nested_styled_adapter')
5842        adapter2.log(logging.WARNING, 'Goodbye {}.', 'world')
5843        self.assertEqual(str(records[-1].msg), '{} Goodbye world.')
5844        self.assertEqual(records[-1].funcName, 'test_nested_styled_adapter')
5845
5846    def test_find_caller_with_stacklevel(self):
5847        the_level = 1
5848        trigger = self.adapter.warning
5849
5850        def innermost():
5851            trigger('test', stacklevel=the_level)
5852
5853        def inner():
5854            innermost()
5855
5856        def outer():
5857            inner()
5858
5859        records = self.recording.records
5860        outer()
5861        self.assertEqual(records[-1].funcName, 'innermost')
5862        lineno = records[-1].lineno
5863        the_level += 1
5864        outer()
5865        self.assertEqual(records[-1].funcName, 'inner')
5866        self.assertGreater(records[-1].lineno, lineno)
5867        lineno = records[-1].lineno
5868        the_level += 1
5869        outer()
5870        self.assertEqual(records[-1].funcName, 'outer')
5871        self.assertGreater(records[-1].lineno, lineno)
5872        lineno = records[-1].lineno
5873        the_level += 1
5874        outer()
5875        self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel')
5876        self.assertGreater(records[-1].lineno, lineno)
5877
5878    def test_extra_in_records(self):
5879        self.adapter = logging.LoggerAdapter(logger=self.logger,
5880                                             extra={'foo': '1'})
5881
5882        self.adapter.critical('foo should be here')
5883        self.assertEqual(len(self.recording.records), 1)
5884        record = self.recording.records[0]
5885        self.assertTrue(hasattr(record, 'foo'))
5886        self.assertEqual(record.foo, '1')
5887
5888    def test_extra_not_merged_by_default(self):
5889        self.adapter.critical('foo should NOT be here', extra={'foo': 'nope'})
5890        self.assertEqual(len(self.recording.records), 1)
5891        record = self.recording.records[0]
5892        self.assertFalse(hasattr(record, 'foo'))
5893
5894    def test_extra_merged(self):
5895        self.adapter = logging.LoggerAdapter(logger=self.logger,
5896                                             extra={'foo': '1'},
5897                                             merge_extra=True)
5898
5899        self.adapter.critical('foo and bar should be here', extra={'bar': '2'})
5900        self.assertEqual(len(self.recording.records), 1)
5901        record = self.recording.records[0]
5902        self.assertTrue(hasattr(record, 'foo'))
5903        self.assertTrue(hasattr(record, 'bar'))
5904        self.assertEqual(record.foo, '1')
5905        self.assertEqual(record.bar, '2')
5906
5907    def test_extra_merged_log_call_has_precedence(self):
5908        self.adapter = logging.LoggerAdapter(logger=self.logger,
5909                                             extra={'foo': '1'},
5910                                             merge_extra=True)
5911
5912        self.adapter.critical('foo shall be min', extra={'foo': '2'})
5913        self.assertEqual(len(self.recording.records), 1)
5914        record = self.recording.records[0]
5915        self.assertTrue(hasattr(record, 'foo'))
5916        self.assertEqual(record.foo, '2')
5917
5918
5919class PrefixAdapter(logging.LoggerAdapter):
5920    prefix = 'Adapter'
5921
5922    def process(self, msg, kwargs):
5923        return f"{self.prefix} {msg}", kwargs
5924
5925
5926class Message:
5927    def __init__(self, fmt, args):
5928        self.fmt = fmt
5929        self.args = args
5930
5931    def __str__(self):
5932        return self.fmt.format(*self.args)
5933
5934
5935class StyleAdapter(logging.LoggerAdapter):
5936    def log(self, level, msg, /, *args, stacklevel=1, **kwargs):
5937        if self.isEnabledFor(level):
5938            msg, kwargs = self.process(msg, kwargs)
5939            self.logger.log(level, Message(msg, args), **kwargs,
5940                            stacklevel=stacklevel+1)
5941
5942
5943class LoggerTest(BaseTest, AssertErrorMessage):
5944
5945    def setUp(self):
5946        super(LoggerTest, self).setUp()
5947        self.recording = RecordingHandler()
5948        self.logger = logging.Logger(name='blah')
5949        self.logger.addHandler(self.recording)
5950        self.addCleanup(self.logger.removeHandler, self.recording)
5951        self.addCleanup(self.recording.close)
5952        self.addCleanup(logging.shutdown)
5953
5954    def test_set_invalid_level(self):
5955        self.assert_error_message(
5956            TypeError, 'Level not an integer or a valid string: None',
5957            self.logger.setLevel, None)
5958        self.assert_error_message(
5959            TypeError, 'Level not an integer or a valid string: (0, 0)',
5960            self.logger.setLevel, (0, 0))
5961
5962    def test_exception(self):
5963        msg = 'testing exception: %r'
5964        exc = None
5965        try:
5966            1 / 0
5967        except ZeroDivisionError as e:
5968            exc = e
5969            self.logger.exception(msg, self.recording)
5970
5971        self.assertEqual(len(self.recording.records), 1)
5972        record = self.recording.records[0]
5973        self.assertEqual(record.levelno, logging.ERROR)
5974        self.assertEqual(record.msg, msg)
5975        self.assertEqual(record.args, (self.recording,))
5976        self.assertEqual(record.exc_info,
5977                         (exc.__class__, exc, exc.__traceback__))
5978
5979    def test_log_invalid_level_with_raise(self):
5980        with support.swap_attr(logging, 'raiseExceptions', True):
5981            self.assertRaises(TypeError, self.logger.log, '10', 'test message')
5982
5983    def test_log_invalid_level_no_raise(self):
5984        with support.swap_attr(logging, 'raiseExceptions', False):
5985            self.logger.log('10', 'test message')  # no exception happens
5986
5987    def test_find_caller_with_stack_info(self):
5988        called = []
5989        support.patch(self, logging.traceback, 'print_stack',
5990                      lambda f, file: called.append(file.getvalue()))
5991
5992        self.logger.findCaller(stack_info=True)
5993
5994        self.assertEqual(len(called), 1)
5995        self.assertEqual('Stack (most recent call last):\n', called[0])
5996
5997    def test_find_caller_with_stacklevel(self):
5998        the_level = 1
5999        trigger = self.logger.warning
6000
6001        def innermost():
6002            trigger('test', stacklevel=the_level)
6003
6004        def inner():
6005            innermost()
6006
6007        def outer():
6008            inner()
6009
6010        records = self.recording.records
6011        outer()
6012        self.assertEqual(records[-1].funcName, 'innermost')
6013        lineno = records[-1].lineno
6014        the_level += 1
6015        outer()
6016        self.assertEqual(records[-1].funcName, 'inner')
6017        self.assertGreater(records[-1].lineno, lineno)
6018        lineno = records[-1].lineno
6019        the_level += 1
6020        outer()
6021        self.assertEqual(records[-1].funcName, 'outer')
6022        self.assertGreater(records[-1].lineno, lineno)
6023        lineno = records[-1].lineno
6024        root_logger = logging.getLogger()
6025        root_logger.addHandler(self.recording)
6026        trigger = logging.warning
6027        outer()
6028        self.assertEqual(records[-1].funcName, 'outer')
6029        root_logger.removeHandler(self.recording)
6030        trigger = self.logger.warning
6031        the_level += 1
6032        outer()
6033        self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel')
6034        self.assertGreater(records[-1].lineno, lineno)
6035
6036    def test_make_record_with_extra_overwrite(self):
6037        name = 'my record'
6038        level = 13
6039        fn = lno = msg = args = exc_info = func = sinfo = None
6040        rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
6041                                       exc_info, func, sinfo)
6042
6043        for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
6044            extra = {key: 'some value'}
6045            self.assertRaises(KeyError, self.logger.makeRecord, name, level,
6046                              fn, lno, msg, args, exc_info,
6047                              extra=extra, sinfo=sinfo)
6048
6049    def test_make_record_with_extra_no_overwrite(self):
6050        name = 'my record'
6051        level = 13
6052        fn = lno = msg = args = exc_info = func = sinfo = None
6053        extra = {'valid_key': 'some value'}
6054        result = self.logger.makeRecord(name, level, fn, lno, msg, args,
6055                                        exc_info, extra=extra, sinfo=sinfo)
6056        self.assertIn('valid_key', result.__dict__)
6057
6058    def test_has_handlers(self):
6059        self.assertTrue(self.logger.hasHandlers())
6060
6061        for handler in self.logger.handlers:
6062            self.logger.removeHandler(handler)
6063        self.assertFalse(self.logger.hasHandlers())
6064
6065    def test_has_handlers_no_propagate(self):
6066        child_logger = logging.getLogger('blah.child')
6067        child_logger.propagate = False
6068        self.assertFalse(child_logger.hasHandlers())
6069
6070    def test_is_enabled_for(self):
6071        old_disable = self.logger.manager.disable
6072        self.logger.manager.disable = 23
6073        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
6074        self.assertFalse(self.logger.isEnabledFor(22))
6075
6076    def test_is_enabled_for_disabled_logger(self):
6077        old_disabled = self.logger.disabled
6078        old_disable = self.logger.manager.disable
6079
6080        self.logger.disabled = True
6081        self.logger.manager.disable = 21
6082
6083        self.addCleanup(setattr, self.logger, 'disabled', old_disabled)
6084        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
6085
6086        self.assertFalse(self.logger.isEnabledFor(22))
6087
6088    def test_root_logger_aliases(self):
6089        root = logging.getLogger()
6090        self.assertIs(root, logging.root)
6091        self.assertIs(root, logging.getLogger(None))
6092        self.assertIs(root, logging.getLogger(''))
6093        self.assertIs(root, logging.getLogger('root'))
6094        self.assertIs(root, logging.getLogger('foo').root)
6095        self.assertIs(root, logging.getLogger('foo.bar').root)
6096        self.assertIs(root, logging.getLogger('foo').parent)
6097
6098        self.assertIsNot(root, logging.getLogger('\0'))
6099        self.assertIsNot(root, logging.getLogger('foo.bar').parent)
6100
6101    def test_invalid_names(self):
6102        self.assertRaises(TypeError, logging.getLogger, any)
6103        self.assertRaises(TypeError, logging.getLogger, b'foo')
6104
6105    def test_pickling(self):
6106        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
6107            for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'):
6108                logger = logging.getLogger(name)
6109                s = pickle.dumps(logger, proto)
6110                unpickled = pickle.loads(s)
6111                self.assertIs(unpickled, logger)
6112
6113    def test_caching(self):
6114        root = self.root_logger
6115        logger1 = logging.getLogger("abc")
6116        logger2 = logging.getLogger("abc.def")
6117
6118        # Set root logger level and ensure cache is empty
6119        root.setLevel(logging.ERROR)
6120        self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR)
6121        self.assertEqual(logger2._cache, {})
6122
6123        # Ensure cache is populated and calls are consistent
6124        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
6125        self.assertFalse(logger2.isEnabledFor(logging.DEBUG))
6126        self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False})
6127        self.assertEqual(root._cache, {})
6128        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
6129
6130        # Ensure root cache gets populated
6131        self.assertEqual(root._cache, {})
6132        self.assertTrue(root.isEnabledFor(logging.ERROR))
6133        self.assertEqual(root._cache, {logging.ERROR: True})
6134
6135        # Set parent logger level and ensure caches are emptied
6136        logger1.setLevel(logging.CRITICAL)
6137        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
6138        self.assertEqual(logger2._cache, {})
6139
6140        # Ensure logger2 uses parent logger's effective level
6141        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
6142
6143        # Set level to NOTSET and ensure caches are empty
6144        logger2.setLevel(logging.NOTSET)
6145        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
6146        self.assertEqual(logger2._cache, {})
6147        self.assertEqual(logger1._cache, {})
6148        self.assertEqual(root._cache, {})
6149
6150        # Verify logger2 follows parent and not root
6151        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
6152        self.assertTrue(logger2.isEnabledFor(logging.CRITICAL))
6153        self.assertFalse(logger1.isEnabledFor(logging.ERROR))
6154        self.assertTrue(logger1.isEnabledFor(logging.CRITICAL))
6155        self.assertTrue(root.isEnabledFor(logging.ERROR))
6156
6157        # Disable logging in manager and ensure caches are clear
6158        logging.disable()
6159        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
6160        self.assertEqual(logger2._cache, {})
6161        self.assertEqual(logger1._cache, {})
6162        self.assertEqual(root._cache, {})
6163
6164        # Ensure no loggers are enabled
6165        self.assertFalse(logger1.isEnabledFor(logging.CRITICAL))
6166        self.assertFalse(logger2.isEnabledFor(logging.CRITICAL))
6167        self.assertFalse(root.isEnabledFor(logging.CRITICAL))
6168
6169
6170class BaseFileTest(BaseTest):
6171    "Base class for handler tests that write log files"
6172
6173    def setUp(self):
6174        BaseTest.setUp(self)
6175        self.fn = make_temp_file(".log", "test_logging-2-")
6176        self.rmfiles = []
6177
6178    def tearDown(self):
6179        for fn in self.rmfiles:
6180            os.unlink(fn)
6181        if os.path.exists(self.fn):
6182            os.unlink(self.fn)
6183        BaseTest.tearDown(self)
6184
6185    def assertLogFile(self, filename):
6186        "Assert a log file is there and register it for deletion"
6187        self.assertTrue(os.path.exists(filename),
6188                        msg="Log file %r does not exist" % filename)
6189        self.rmfiles.append(filename)
6190
6191    def next_rec(self):
6192        return logging.LogRecord('n', logging.DEBUG, 'p', 1,
6193                                 self.next_message(), None, None, None)
6194
6195class FileHandlerTest(BaseFileTest):
6196    def test_delay(self):
6197        os.unlink(self.fn)
6198        fh = logging.FileHandler(self.fn, encoding='utf-8', delay=True)
6199        self.assertIsNone(fh.stream)
6200        self.assertFalse(os.path.exists(self.fn))
6201        fh.handle(logging.makeLogRecord({}))
6202        self.assertIsNotNone(fh.stream)
6203        self.assertTrue(os.path.exists(self.fn))
6204        fh.close()
6205
6206    def test_emit_after_closing_in_write_mode(self):
6207        # Issue #42378
6208        os.unlink(self.fn)
6209        fh = logging.FileHandler(self.fn, encoding='utf-8', mode='w')
6210        fh.setFormatter(logging.Formatter('%(message)s'))
6211        fh.emit(self.next_rec())    # '1'
6212        fh.close()
6213        fh.emit(self.next_rec())    # '2'
6214        with open(self.fn) as fp:
6215            self.assertEqual(fp.read().strip(), '1')
6216
6217class RotatingFileHandlerTest(BaseFileTest):
6218    def test_should_not_rollover(self):
6219        # If file is empty rollover never occurs
6220        rh = logging.handlers.RotatingFileHandler(
6221            self.fn, encoding="utf-8", maxBytes=1)
6222        self.assertFalse(rh.shouldRollover(None))
6223        rh.close()
6224
6225        # If maxBytes is zero rollover never occurs
6226        rh = logging.handlers.RotatingFileHandler(
6227                self.fn, encoding="utf-8", maxBytes=0)
6228        self.assertFalse(rh.shouldRollover(None))
6229        rh.close()
6230
6231        with open(self.fn, 'wb') as f:
6232            f.write(b'\n')
6233        rh = logging.handlers.RotatingFileHandler(
6234                self.fn, encoding="utf-8", maxBytes=0)
6235        self.assertFalse(rh.shouldRollover(None))
6236        rh.close()
6237
6238    @unittest.skipIf(support.is_wasi, "WASI does not have /dev/null.")
6239    def test_should_not_rollover_non_file(self):
6240        # bpo-45401 - test with special file
6241        # We set maxBytes to 1 so that rollover would normally happen, except
6242        # for the check for regular files
6243        rh = logging.handlers.RotatingFileHandler(
6244                os.devnull, encoding="utf-8", maxBytes=1)
6245        self.assertFalse(rh.shouldRollover(self.next_rec()))
6246        rh.close()
6247
6248    def test_should_rollover(self):
6249        with open(self.fn, 'wb') as f:
6250            f.write(b'\n')
6251        rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8", maxBytes=2)
6252        self.assertTrue(rh.shouldRollover(self.next_rec()))
6253        rh.close()
6254
6255    def test_file_created(self):
6256        # checks that the file is created and assumes it was created
6257        # by us
6258        os.unlink(self.fn)
6259        rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8")
6260        rh.emit(self.next_rec())
6261        self.assertLogFile(self.fn)
6262        rh.close()
6263
6264    def test_max_bytes(self, delay=False):
6265        kwargs = {'delay': delay} if delay else {}
6266        os.unlink(self.fn)
6267        rh = logging.handlers.RotatingFileHandler(
6268            self.fn, encoding="utf-8", backupCount=2, maxBytes=100, **kwargs)
6269        self.assertIs(os.path.exists(self.fn), not delay)
6270        small = logging.makeLogRecord({'msg': 'a'})
6271        large = logging.makeLogRecord({'msg': 'b'*100})
6272        self.assertFalse(rh.shouldRollover(small))
6273        self.assertFalse(rh.shouldRollover(large))
6274        rh.emit(small)
6275        self.assertLogFile(self.fn)
6276        self.assertFalse(os.path.exists(self.fn + ".1"))
6277        self.assertFalse(rh.shouldRollover(small))
6278        self.assertTrue(rh.shouldRollover(large))
6279        rh.emit(large)
6280        self.assertTrue(os.path.exists(self.fn))
6281        self.assertLogFile(self.fn + ".1")
6282        self.assertFalse(os.path.exists(self.fn + ".2"))
6283        self.assertTrue(rh.shouldRollover(small))
6284        self.assertTrue(rh.shouldRollover(large))
6285        rh.close()
6286
6287    def test_max_bytes_delay(self):
6288        self.test_max_bytes(delay=True)
6289
6290    def test_rollover_filenames(self):
6291        def namer(name):
6292            return name + ".test"
6293        rh = logging.handlers.RotatingFileHandler(
6294            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
6295        rh.namer = namer
6296        rh.emit(self.next_rec())
6297        self.assertLogFile(self.fn)
6298        self.assertFalse(os.path.exists(namer(self.fn + ".1")))
6299        rh.emit(self.next_rec())
6300        self.assertLogFile(namer(self.fn + ".1"))
6301        self.assertFalse(os.path.exists(namer(self.fn + ".2")))
6302        rh.emit(self.next_rec())
6303        self.assertLogFile(namer(self.fn + ".2"))
6304        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
6305        rh.emit(self.next_rec())
6306        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
6307        rh.close()
6308
6309    def test_namer_rotator_inheritance(self):
6310        class HandlerWithNamerAndRotator(logging.handlers.RotatingFileHandler):
6311            def namer(self, name):
6312                return name + ".test"
6313
6314            def rotator(self, source, dest):
6315                if os.path.exists(source):
6316                    os.replace(source, dest + ".rotated")
6317
6318        rh = HandlerWithNamerAndRotator(
6319            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
6320        self.assertEqual(rh.namer(self.fn), self.fn + ".test")
6321        rh.emit(self.next_rec())
6322        self.assertLogFile(self.fn)
6323        rh.emit(self.next_rec())
6324        self.assertLogFile(rh.namer(self.fn + ".1") + ".rotated")
6325        self.assertFalse(os.path.exists(rh.namer(self.fn + ".1")))
6326        rh.close()
6327
6328    @support.requires_zlib()
6329    def test_rotator(self):
6330        def namer(name):
6331            return name + ".gz"
6332
6333        def rotator(source, dest):
6334            with open(source, "rb") as sf:
6335                data = sf.read()
6336                compressed = zlib.compress(data, 9)
6337                with open(dest, "wb") as df:
6338                    df.write(compressed)
6339            os.remove(source)
6340
6341        rh = logging.handlers.RotatingFileHandler(
6342            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
6343        rh.rotator = rotator
6344        rh.namer = namer
6345        m1 = self.next_rec()
6346        rh.emit(m1)
6347        self.assertLogFile(self.fn)
6348        m2 = self.next_rec()
6349        rh.emit(m2)
6350        fn = namer(self.fn + ".1")
6351        self.assertLogFile(fn)
6352        newline = os.linesep
6353        with open(fn, "rb") as f:
6354            compressed = f.read()
6355            data = zlib.decompress(compressed)
6356            self.assertEqual(data.decode("ascii"), m1.msg + newline)
6357        rh.emit(self.next_rec())
6358        fn = namer(self.fn + ".2")
6359        self.assertLogFile(fn)
6360        with open(fn, "rb") as f:
6361            compressed = f.read()
6362            data = zlib.decompress(compressed)
6363            self.assertEqual(data.decode("ascii"), m1.msg + newline)
6364        rh.emit(self.next_rec())
6365        fn = namer(self.fn + ".2")
6366        with open(fn, "rb") as f:
6367            compressed = f.read()
6368            data = zlib.decompress(compressed)
6369            self.assertEqual(data.decode("ascii"), m2.msg + newline)
6370        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
6371        rh.close()
6372
6373class TimedRotatingFileHandlerTest(BaseFileTest):
6374    @unittest.skipIf(support.is_wasi, "WASI does not have /dev/null.")
6375    def test_should_not_rollover(self):
6376        # See bpo-45401. Should only ever rollover regular files
6377        fh = logging.handlers.TimedRotatingFileHandler(
6378                os.devnull, 'S', encoding="utf-8", backupCount=1)
6379        time.sleep(1.1)    # a little over a second ...
6380        r = logging.makeLogRecord({'msg': 'testing - device file'})
6381        self.assertFalse(fh.shouldRollover(r))
6382        fh.close()
6383
6384    # other test methods added below
6385    def test_rollover(self):
6386        fh = logging.handlers.TimedRotatingFileHandler(
6387                self.fn, 'S', encoding="utf-8", backupCount=1)
6388        fmt = logging.Formatter('%(asctime)s %(message)s')
6389        fh.setFormatter(fmt)
6390        r1 = logging.makeLogRecord({'msg': 'testing - initial'})
6391        fh.emit(r1)
6392        self.assertLogFile(self.fn)
6393        time.sleep(1.1)    # a little over a second ...
6394        r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
6395        fh.emit(r2)
6396        fh.close()
6397        # At this point, we should have a recent rotated file which we
6398        # can test for the existence of. However, in practice, on some
6399        # machines which run really slowly, we don't know how far back
6400        # in time to go to look for the log file. So, we go back a fair
6401        # bit, and stop as soon as we see a rotated file. In theory this
6402        # could of course still fail, but the chances are lower.
6403        found = False
6404        now = datetime.datetime.now()
6405        GO_BACK = 5 * 60 # seconds
6406        for secs in range(GO_BACK):
6407            prev = now - datetime.timedelta(seconds=secs)
6408            fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
6409            found = os.path.exists(fn)
6410            if found:
6411                self.rmfiles.append(fn)
6412                break
6413        msg = 'No rotated files found, went back %d seconds' % GO_BACK
6414        if not found:
6415            # print additional diagnostics
6416            dn, fn = os.path.split(self.fn)
6417            files = [f for f in os.listdir(dn) if f.startswith(fn)]
6418            print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
6419            print('The only matching files are: %s' % files, file=sys.stderr)
6420            for f in files:
6421                print('Contents of %s:' % f)
6422                path = os.path.join(dn, f)
6423                with open(path, 'r') as tf:
6424                    print(tf.read())
6425        self.assertTrue(found, msg=msg)
6426
6427    def test_rollover_at_midnight(self, weekly=False):
6428        os_helper.unlink(self.fn)
6429        now = datetime.datetime.now()
6430        atTime = now.time()
6431        if not 0.1 < atTime.microsecond/1e6 < 0.9:
6432            # The test requires all records to be emitted within
6433            # the range of the same whole second.
6434            time.sleep((0.1 - atTime.microsecond/1e6) % 1.0)
6435            now = datetime.datetime.now()
6436            atTime = now.time()
6437        atTime = atTime.replace(microsecond=0)
6438        fmt = logging.Formatter('%(asctime)s %(message)s')
6439        when = f'W{now.weekday()}' if weekly else 'MIDNIGHT'
6440        for i in range(3):
6441            fh = logging.handlers.TimedRotatingFileHandler(
6442                self.fn, encoding="utf-8", when=when, atTime=atTime)
6443            fh.setFormatter(fmt)
6444            r2 = logging.makeLogRecord({'msg': f'testing1 {i}'})
6445            fh.emit(r2)
6446            fh.close()
6447        self.assertLogFile(self.fn)
6448        with open(self.fn, encoding="utf-8") as f:
6449            for i, line in enumerate(f):
6450                self.assertIn(f'testing1 {i}', line)
6451
6452        os.utime(self.fn, (now.timestamp() - 1,)*2)
6453        for i in range(2):
6454            fh = logging.handlers.TimedRotatingFileHandler(
6455                self.fn, encoding="utf-8", when=when, atTime=atTime)
6456            fh.setFormatter(fmt)
6457            r2 = logging.makeLogRecord({'msg': f'testing2 {i}'})
6458            fh.emit(r2)
6459            fh.close()
6460        rolloverDate = now - datetime.timedelta(days=7 if weekly else 1)
6461        otherfn = f'{self.fn}.{rolloverDate:%Y-%m-%d}'
6462        self.assertLogFile(otherfn)
6463        with open(self.fn, encoding="utf-8") as f:
6464            for i, line in enumerate(f):
6465                self.assertIn(f'testing2 {i}', line)
6466        with open(otherfn, encoding="utf-8") as f:
6467            for i, line in enumerate(f):
6468                self.assertIn(f'testing1 {i}', line)
6469
6470    def test_rollover_at_weekday(self):
6471        self.test_rollover_at_midnight(weekly=True)
6472
6473    def test_invalid(self):
6474        assertRaises = self.assertRaises
6475        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
6476                     self.fn, 'X', encoding="utf-8", delay=True)
6477        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
6478                     self.fn, 'W', encoding="utf-8", delay=True)
6479        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
6480                     self.fn, 'W7', encoding="utf-8", delay=True)
6481
6482    # TODO: Test for utc=False.
6483    def test_compute_rollover_daily_attime(self):
6484        currentTime = 0
6485        rh = logging.handlers.TimedRotatingFileHandler(
6486            self.fn, encoding="utf-8", when='MIDNIGHT',
6487            utc=True, atTime=None)
6488        try:
6489            actual = rh.computeRollover(currentTime)
6490            self.assertEqual(actual, currentTime + 24 * 60 * 60)
6491
6492            actual = rh.computeRollover(currentTime + 24 * 60 * 60 - 1)
6493            self.assertEqual(actual, currentTime + 24 * 60 * 60)
6494
6495            actual = rh.computeRollover(currentTime + 24 * 60 * 60)
6496            self.assertEqual(actual, currentTime + 48 * 60 * 60)
6497
6498            actual = rh.computeRollover(currentTime + 25 * 60 * 60)
6499            self.assertEqual(actual, currentTime + 48 * 60 * 60)
6500        finally:
6501            rh.close()
6502
6503        atTime = datetime.time(12, 0, 0)
6504        rh = logging.handlers.TimedRotatingFileHandler(
6505            self.fn, encoding="utf-8", when='MIDNIGHT',
6506            utc=True, atTime=atTime)
6507        try:
6508            actual = rh.computeRollover(currentTime)
6509            self.assertEqual(actual, currentTime + 12 * 60 * 60)
6510
6511            actual = rh.computeRollover(currentTime + 12 * 60 * 60 - 1)
6512            self.assertEqual(actual, currentTime + 12 * 60 * 60)
6513
6514            actual = rh.computeRollover(currentTime + 12 * 60 * 60)
6515            self.assertEqual(actual, currentTime + 36 * 60 * 60)
6516
6517            actual = rh.computeRollover(currentTime + 13 * 60 * 60)
6518            self.assertEqual(actual, currentTime + 36 * 60 * 60)
6519        finally:
6520            rh.close()
6521
6522    # TODO: Test for utc=False.
6523    def test_compute_rollover_weekly_attime(self):
6524        currentTime = int(time.time())
6525        today = currentTime - currentTime % 86400
6526
6527        atTime = datetime.time(12, 0, 0)
6528
6529        wday = time.gmtime(today).tm_wday
6530        for day in range(7):
6531            rh = logging.handlers.TimedRotatingFileHandler(
6532                self.fn, encoding="utf-8", when='W%d' % day, interval=1, backupCount=0,
6533                utc=True, atTime=atTime)
6534            try:
6535                if wday > day:
6536                    # The rollover day has already passed this week, so we
6537                    # go over into next week
6538                    expected = (7 - wday + day)
6539                else:
6540                    expected = (day - wday)
6541                # At this point expected is in days from now, convert to seconds
6542                expected *= 24 * 60 * 60
6543                # Add in the rollover time
6544                expected += 12 * 60 * 60
6545                # Add in adjustment for today
6546                expected += today
6547
6548                actual = rh.computeRollover(today)
6549                if actual != expected:
6550                    print('failed in timezone: %d' % time.timezone)
6551                    print('local vars: %s' % locals())
6552                self.assertEqual(actual, expected)
6553
6554                actual = rh.computeRollover(today + 12 * 60 * 60 - 1)
6555                if actual != expected:
6556                    print('failed in timezone: %d' % time.timezone)
6557                    print('local vars: %s' % locals())
6558                self.assertEqual(actual, expected)
6559
6560                if day == wday:
6561                    # goes into following week
6562                    expected += 7 * 24 * 60 * 60
6563                actual = rh.computeRollover(today + 12 * 60 * 60)
6564                if actual != expected:
6565                    print('failed in timezone: %d' % time.timezone)
6566                    print('local vars: %s' % locals())
6567                self.assertEqual(actual, expected)
6568
6569                actual = rh.computeRollover(today + 13 * 60 * 60)
6570                if actual != expected:
6571                    print('failed in timezone: %d' % time.timezone)
6572                    print('local vars: %s' % locals())
6573                self.assertEqual(actual, expected)
6574            finally:
6575                rh.close()
6576
6577    def test_compute_files_to_delete(self):
6578        # See bpo-46063 for background
6579        wd = tempfile.mkdtemp(prefix='test_logging_')
6580        self.addCleanup(shutil.rmtree, wd)
6581        times = []
6582        dt = datetime.datetime.now()
6583        for i in range(10):
6584            times.append(dt.strftime('%Y-%m-%d_%H-%M-%S'))
6585            dt += datetime.timedelta(seconds=5)
6586        prefixes = ('a.b', 'a.b.c', 'd.e', 'd.e.f', 'g')
6587        files = []
6588        rotators = []
6589        for prefix in prefixes:
6590            p = os.path.join(wd, '%s.log' % prefix)
6591            rotator = logging.handlers.TimedRotatingFileHandler(p, when='s',
6592                                                                interval=5,
6593                                                                backupCount=7,
6594                                                                delay=True)
6595            rotators.append(rotator)
6596            if prefix.startswith('a.b'):
6597                for t in times:
6598                    files.append('%s.log.%s' % (prefix, t))
6599            elif prefix.startswith('d.e'):
6600                def namer(filename):
6601                    dirname, basename = os.path.split(filename)
6602                    basename = basename.replace('.log', '') + '.log'
6603                    return os.path.join(dirname, basename)
6604                rotator.namer = namer
6605                for t in times:
6606                    files.append('%s.%s.log' % (prefix, t))
6607            elif prefix == 'g':
6608                def namer(filename):
6609                    dirname, basename = os.path.split(filename)
6610                    basename = 'g' + basename[6:] + '.oldlog'
6611                    return os.path.join(dirname, basename)
6612                rotator.namer = namer
6613                for t in times:
6614                    files.append('g%s.oldlog' % t)
6615        # Create empty files
6616        for fn in files:
6617            p = os.path.join(wd, fn)
6618            with open(p, 'wb') as f:
6619                pass
6620        # Now the checks that only the correct files are offered up for deletion
6621        for i, prefix in enumerate(prefixes):
6622            rotator = rotators[i]
6623            candidates = rotator.getFilesToDelete()
6624            self.assertEqual(len(candidates), 3, candidates)
6625            if prefix.startswith('a.b'):
6626                p = '%s.log.' % prefix
6627                for c in candidates:
6628                    d, fn = os.path.split(c)
6629                    self.assertTrue(fn.startswith(p))
6630            elif prefix.startswith('d.e'):
6631                for c in candidates:
6632                    d, fn = os.path.split(c)
6633                    self.assertTrue(fn.endswith('.log'), fn)
6634                    self.assertTrue(fn.startswith(prefix + '.') and
6635                                    fn[len(prefix) + 2].isdigit())
6636            elif prefix == 'g':
6637                for c in candidates:
6638                    d, fn = os.path.split(c)
6639                    self.assertTrue(fn.endswith('.oldlog'))
6640                    self.assertTrue(fn.startswith('g') and fn[1].isdigit())
6641
6642    def test_compute_files_to_delete_same_filename_different_extensions(self):
6643        # See GH-93205 for background
6644        wd = pathlib.Path(tempfile.mkdtemp(prefix='test_logging_'))
6645        self.addCleanup(shutil.rmtree, wd)
6646        times = []
6647        dt = datetime.datetime.now()
6648        n_files = 10
6649        for _ in range(n_files):
6650            times.append(dt.strftime('%Y-%m-%d_%H-%M-%S'))
6651            dt += datetime.timedelta(seconds=5)
6652        prefixes = ('a.log', 'a.log.b')
6653        files = []
6654        rotators = []
6655        for i, prefix in enumerate(prefixes):
6656            backupCount = i+1
6657            rotator = logging.handlers.TimedRotatingFileHandler(wd / prefix, when='s',
6658                                                                interval=5,
6659                                                                backupCount=backupCount,
6660                                                                delay=True)
6661            rotators.append(rotator)
6662            for t in times:
6663                files.append('%s.%s' % (prefix, t))
6664        for t in times:
6665            files.append('a.log.%s.c' % t)
6666        # Create empty files
6667        for f in files:
6668            (wd / f).touch()
6669        # Now the checks that only the correct files are offered up for deletion
6670        for i, prefix in enumerate(prefixes):
6671            backupCount = i+1
6672            rotator = rotators[i]
6673            candidates = rotator.getFilesToDelete()
6674            self.assertEqual(len(candidates), n_files - backupCount, candidates)
6675            matcher = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}\Z")
6676            for c in candidates:
6677                d, fn = os.path.split(c)
6678                self.assertTrue(fn.startswith(prefix+'.'))
6679                suffix = fn[(len(prefix)+1):]
6680                self.assertRegex(suffix, matcher)
6681
6682    # Run with US-style DST rules: DST begins 2 a.m. on second Sunday in
6683    # March (M3.2.0) and ends 2 a.m. on first Sunday in November (M11.1.0).
6684    @support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0')
6685    def test_compute_rollover_MIDNIGHT_local(self):
6686        # DST begins at 2012-3-11T02:00:00 and ends at 2012-11-4T02:00:00.
6687        DT = datetime.datetime
6688        def test(current, expected):
6689            actual = fh.computeRollover(current.timestamp())
6690            diff = actual - expected.timestamp()
6691            if diff:
6692                self.assertEqual(diff, 0, datetime.timedelta(seconds=diff))
6693
6694        fh = logging.handlers.TimedRotatingFileHandler(
6695            self.fn, encoding="utf-8", when='MIDNIGHT', utc=False)
6696
6697        test(DT(2012, 3, 10, 23, 59, 59), DT(2012, 3, 11, 0, 0))
6698        test(DT(2012, 3, 11, 0, 0), DT(2012, 3, 12, 0, 0))
6699        test(DT(2012, 3, 11, 1, 0), DT(2012, 3, 12, 0, 0))
6700
6701        test(DT(2012, 11, 3, 23, 59, 59), DT(2012, 11, 4, 0, 0))
6702        test(DT(2012, 11, 4, 0, 0), DT(2012, 11, 5, 0, 0))
6703        test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 5, 0, 0))
6704
6705        fh.close()
6706
6707        fh = logging.handlers.TimedRotatingFileHandler(
6708            self.fn, encoding="utf-8", when='MIDNIGHT', utc=False,
6709            atTime=datetime.time(12, 0, 0))
6710
6711        test(DT(2012, 3, 10, 11, 59, 59), DT(2012, 3, 10, 12, 0))
6712        test(DT(2012, 3, 10, 12, 0), DT(2012, 3, 11, 12, 0))
6713        test(DT(2012, 3, 10, 13, 0), DT(2012, 3, 11, 12, 0))
6714
6715        test(DT(2012, 11, 3, 11, 59, 59), DT(2012, 11, 3, 12, 0))
6716        test(DT(2012, 11, 3, 12, 0), DT(2012, 11, 4, 12, 0))
6717        test(DT(2012, 11, 3, 13, 0), DT(2012, 11, 4, 12, 0))
6718
6719        fh.close()
6720
6721        fh = logging.handlers.TimedRotatingFileHandler(
6722            self.fn, encoding="utf-8", when='MIDNIGHT', utc=False,
6723            atTime=datetime.time(2, 0, 0))
6724
6725        test(DT(2012, 3, 10, 1, 59, 59), DT(2012, 3, 10, 2, 0))
6726        # 2:00:00 is the same as 3:00:00 at 2012-3-11.
6727        test(DT(2012, 3, 10, 2, 0), DT(2012, 3, 11, 3, 0))
6728        test(DT(2012, 3, 10, 3, 0), DT(2012, 3, 11, 3, 0))
6729
6730        test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 11, 3, 0))
6731        # No time between 2:00:00 and 3:00:00 at 2012-3-11.
6732        test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 12, 2, 0))
6733        test(DT(2012, 3, 11, 4, 0), DT(2012, 3, 12, 2, 0))
6734
6735        test(DT(2012, 11, 3, 1, 59, 59), DT(2012, 11, 3, 2, 0))
6736        test(DT(2012, 11, 3, 2, 0), DT(2012, 11, 4, 2, 0))
6737        test(DT(2012, 11, 3, 3, 0), DT(2012, 11, 4, 2, 0))
6738
6739        # 1:00:00-2:00:00 is repeated twice at 2012-11-4.
6740        test(DT(2012, 11, 4, 1, 59, 59), DT(2012, 11, 4, 2, 0))
6741        test(DT(2012, 11, 4, 1, 59, 59, fold=1), DT(2012, 11, 4, 2, 0))
6742        test(DT(2012, 11, 4, 2, 0), DT(2012, 11, 5, 2, 0))
6743        test(DT(2012, 11, 4, 3, 0), DT(2012, 11, 5, 2, 0))
6744
6745        fh.close()
6746
6747        fh = logging.handlers.TimedRotatingFileHandler(
6748            self.fn, encoding="utf-8", when='MIDNIGHT', utc=False,
6749            atTime=datetime.time(2, 30, 0))
6750
6751        test(DT(2012, 3, 10, 2, 29, 59), DT(2012, 3, 10, 2, 30))
6752        # No time 2:30:00 at 2012-3-11.
6753        test(DT(2012, 3, 10, 2, 30), DT(2012, 3, 11, 3, 30))
6754        test(DT(2012, 3, 10, 3, 0), DT(2012, 3, 11, 3, 30))
6755
6756        test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 11, 3, 30))
6757        # No time between 2:00:00 and 3:00:00 at 2012-3-11.
6758        test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 12, 2, 30))
6759        test(DT(2012, 3, 11, 3, 30), DT(2012, 3, 12, 2, 30))
6760
6761        test(DT(2012, 11, 3, 2, 29, 59), DT(2012, 11, 3, 2, 30))
6762        test(DT(2012, 11, 3, 2, 30), DT(2012, 11, 4, 2, 30))
6763        test(DT(2012, 11, 3, 3, 0), DT(2012, 11, 4, 2, 30))
6764
6765        fh.close()
6766
6767        fh = logging.handlers.TimedRotatingFileHandler(
6768            self.fn, encoding="utf-8", when='MIDNIGHT', utc=False,
6769            atTime=datetime.time(1, 30, 0))
6770
6771        test(DT(2012, 3, 11, 1, 29, 59), DT(2012, 3, 11, 1, 30))
6772        test(DT(2012, 3, 11, 1, 30), DT(2012, 3, 12, 1, 30))
6773        test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 12, 1, 30))
6774        # No time between 2:00:00 and 3:00:00 at 2012-3-11.
6775        test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 12, 1, 30))
6776        test(DT(2012, 3, 11, 3, 30), DT(2012, 3, 12, 1, 30))
6777
6778        # 1:00:00-2:00:00 is repeated twice at 2012-11-4.
6779        test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 4, 1, 30))
6780        test(DT(2012, 11, 4, 1, 29, 59), DT(2012, 11, 4, 1, 30))
6781        test(DT(2012, 11, 4, 1, 30), DT(2012, 11, 5, 1, 30))
6782        test(DT(2012, 11, 4, 1, 59, 59), DT(2012, 11, 5, 1, 30))
6783        # It is weird, but the rollover date jumps back from 2012-11-5
6784        # to 2012-11-4.
6785        test(DT(2012, 11, 4, 1, 0, fold=1), DT(2012, 11, 4, 1, 30, fold=1))
6786        test(DT(2012, 11, 4, 1, 29, 59, fold=1), DT(2012, 11, 4, 1, 30, fold=1))
6787        test(DT(2012, 11, 4, 1, 30, fold=1), DT(2012, 11, 5, 1, 30))
6788        test(DT(2012, 11, 4, 1, 59, 59, fold=1), DT(2012, 11, 5, 1, 30))
6789        test(DT(2012, 11, 4, 2, 0), DT(2012, 11, 5, 1, 30))
6790        test(DT(2012, 11, 4, 2, 30), DT(2012, 11, 5, 1, 30))
6791
6792        fh.close()
6793
6794    # Run with US-style DST rules: DST begins 2 a.m. on second Sunday in
6795    # March (M3.2.0) and ends 2 a.m. on first Sunday in November (M11.1.0).
6796    @support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0')
6797    def test_compute_rollover_W6_local(self):
6798        # DST begins at 2012-3-11T02:00:00 and ends at 2012-11-4T02:00:00.
6799        DT = datetime.datetime
6800        def test(current, expected):
6801            actual = fh.computeRollover(current.timestamp())
6802            diff = actual - expected.timestamp()
6803            if diff:
6804                self.assertEqual(diff, 0, datetime.timedelta(seconds=diff))
6805
6806        fh = logging.handlers.TimedRotatingFileHandler(
6807            self.fn, encoding="utf-8", when='W6', utc=False)
6808
6809        test(DT(2012, 3, 4, 23, 59, 59), DT(2012, 3, 5, 0, 0))
6810        test(DT(2012, 3, 5, 0, 0), DT(2012, 3, 12, 0, 0))
6811        test(DT(2012, 3, 5, 1, 0), DT(2012, 3, 12, 0, 0))
6812
6813        test(DT(2012, 10, 28, 23, 59, 59), DT(2012, 10, 29, 0, 0))
6814        test(DT(2012, 10, 29, 0, 0), DT(2012, 11, 5, 0, 0))
6815        test(DT(2012, 10, 29, 1, 0), DT(2012, 11, 5, 0, 0))
6816
6817        fh.close()
6818
6819        fh = logging.handlers.TimedRotatingFileHandler(
6820            self.fn, encoding="utf-8", when='W6', utc=False,
6821            atTime=datetime.time(0, 0, 0))
6822
6823        test(DT(2012, 3, 10, 23, 59, 59), DT(2012, 3, 11, 0, 0))
6824        test(DT(2012, 3, 11, 0, 0), DT(2012, 3, 18, 0, 0))
6825        test(DT(2012, 3, 11, 1, 0), DT(2012, 3, 18, 0, 0))
6826
6827        test(DT(2012, 11, 3, 23, 59, 59), DT(2012, 11, 4, 0, 0))
6828        test(DT(2012, 11, 4, 0, 0), DT(2012, 11, 11, 0, 0))
6829        test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 11, 0, 0))
6830
6831        fh.close()
6832
6833        fh = logging.handlers.TimedRotatingFileHandler(
6834            self.fn, encoding="utf-8", when='W6', utc=False,
6835            atTime=datetime.time(12, 0, 0))
6836
6837        test(DT(2012, 3, 4, 11, 59, 59), DT(2012, 3, 4, 12, 0))
6838        test(DT(2012, 3, 4, 12, 0), DT(2012, 3, 11, 12, 0))
6839        test(DT(2012, 3, 4, 13, 0), DT(2012, 3, 11, 12, 0))
6840
6841        test(DT(2012, 10, 28, 11, 59, 59), DT(2012, 10, 28, 12, 0))
6842        test(DT(2012, 10, 28, 12, 0), DT(2012, 11, 4, 12, 0))
6843        test(DT(2012, 10, 28, 13, 0), DT(2012, 11, 4, 12, 0))
6844
6845        fh.close()
6846
6847        fh = logging.handlers.TimedRotatingFileHandler(
6848            self.fn, encoding="utf-8", when='W6', utc=False,
6849            atTime=datetime.time(2, 0, 0))
6850
6851        test(DT(2012, 3, 4, 1, 59, 59), DT(2012, 3, 4, 2, 0))
6852        # 2:00:00 is the same as 3:00:00 at 2012-3-11.
6853        test(DT(2012, 3, 4, 2, 0), DT(2012, 3, 11, 3, 0))
6854        test(DT(2012, 3, 4, 3, 0), DT(2012, 3, 11, 3, 0))
6855
6856        test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 11, 3, 0))
6857        # No time between 2:00:00 and 3:00:00 at 2012-3-11.
6858        test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 18, 2, 0))
6859        test(DT(2012, 3, 11, 4, 0), DT(2012, 3, 18, 2, 0))
6860
6861        test(DT(2012, 10, 28, 1, 59, 59), DT(2012, 10, 28, 2, 0))
6862        test(DT(2012, 10, 28, 2, 0), DT(2012, 11, 4, 2, 0))
6863        test(DT(2012, 10, 28, 3, 0), DT(2012, 11, 4, 2, 0))
6864
6865        # 1:00:00-2:00:00 is repeated twice at 2012-11-4.
6866        test(DT(2012, 11, 4, 1, 59, 59), DT(2012, 11, 4, 2, 0))
6867        test(DT(2012, 11, 4, 1, 59, 59, fold=1), DT(2012, 11, 4, 2, 0))
6868        test(DT(2012, 11, 4, 2, 0), DT(2012, 11, 11, 2, 0))
6869        test(DT(2012, 11, 4, 3, 0), DT(2012, 11, 11, 2, 0))
6870
6871        fh.close()
6872
6873        fh = logging.handlers.TimedRotatingFileHandler(
6874            self.fn, encoding="utf-8", when='W6', utc=False,
6875            atTime=datetime.time(2, 30, 0))
6876
6877        test(DT(2012, 3, 4, 2, 29, 59), DT(2012, 3, 4, 2, 30))
6878        # No time 2:30:00 at 2012-3-11.
6879        test(DT(2012, 3, 4, 2, 30), DT(2012, 3, 11, 3, 30))
6880        test(DT(2012, 3, 4, 3, 0), DT(2012, 3, 11, 3, 30))
6881
6882        test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 11, 3, 30))
6883        # No time between 2:00:00 and 3:00:00 at 2012-3-11.
6884        test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 18, 2, 30))
6885        test(DT(2012, 3, 11, 3, 30), DT(2012, 3, 18, 2, 30))
6886
6887        test(DT(2012, 10, 28, 2, 29, 59), DT(2012, 10, 28, 2, 30))
6888        test(DT(2012, 10, 28, 2, 30), DT(2012, 11, 4, 2, 30))
6889        test(DT(2012, 10, 28, 3, 0), DT(2012, 11, 4, 2, 30))
6890
6891        fh.close()
6892
6893        fh = logging.handlers.TimedRotatingFileHandler(
6894            self.fn, encoding="utf-8", when='W6', utc=False,
6895            atTime=datetime.time(1, 30, 0))
6896
6897        test(DT(2012, 3, 11, 1, 29, 59), DT(2012, 3, 11, 1, 30))
6898        test(DT(2012, 3, 11, 1, 30), DT(2012, 3, 18, 1, 30))
6899        test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 18, 1, 30))
6900        # No time between 2:00:00 and 3:00:00 at 2012-3-11.
6901        test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 18, 1, 30))
6902        test(DT(2012, 3, 11, 3, 30), DT(2012, 3, 18, 1, 30))
6903
6904        # 1:00:00-2:00:00 is repeated twice at 2012-11-4.
6905        test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 4, 1, 30))
6906        test(DT(2012, 11, 4, 1, 29, 59), DT(2012, 11, 4, 1, 30))
6907        test(DT(2012, 11, 4, 1, 30), DT(2012, 11, 11, 1, 30))
6908        test(DT(2012, 11, 4, 1, 59, 59), DT(2012, 11, 11, 1, 30))
6909        # It is weird, but the rollover date jumps back from 2012-11-11
6910        # to 2012-11-4.
6911        test(DT(2012, 11, 4, 1, 0, fold=1), DT(2012, 11, 4, 1, 30, fold=1))
6912        test(DT(2012, 11, 4, 1, 29, 59, fold=1), DT(2012, 11, 4, 1, 30, fold=1))
6913        test(DT(2012, 11, 4, 1, 30, fold=1), DT(2012, 11, 11, 1, 30))
6914        test(DT(2012, 11, 4, 1, 59, 59, fold=1), DT(2012, 11, 11, 1, 30))
6915        test(DT(2012, 11, 4, 2, 0), DT(2012, 11, 11, 1, 30))
6916        test(DT(2012, 11, 4, 2, 30), DT(2012, 11, 11, 1, 30))
6917
6918        fh.close()
6919
6920    # Run with US-style DST rules: DST begins 2 a.m. on second Sunday in
6921    # March (M3.2.0) and ends 2 a.m. on first Sunday in November (M11.1.0).
6922    @support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0')
6923    def test_compute_rollover_MIDNIGHT_local_interval(self):
6924        # DST begins at 2012-3-11T02:00:00 and ends at 2012-11-4T02:00:00.
6925        DT = datetime.datetime
6926        def test(current, expected):
6927            actual = fh.computeRollover(current.timestamp())
6928            diff = actual - expected.timestamp()
6929            if diff:
6930                self.assertEqual(diff, 0, datetime.timedelta(seconds=diff))
6931
6932        fh = logging.handlers.TimedRotatingFileHandler(
6933            self.fn, encoding="utf-8", when='MIDNIGHT', utc=False, interval=3)
6934
6935        test(DT(2012, 3, 8, 23, 59, 59), DT(2012, 3, 11, 0, 0))
6936        test(DT(2012, 3, 9, 0, 0), DT(2012, 3, 12, 0, 0))
6937        test(DT(2012, 3, 9, 1, 0), DT(2012, 3, 12, 0, 0))
6938        test(DT(2012, 3, 10, 23, 59, 59), DT(2012, 3, 13, 0, 0))
6939        test(DT(2012, 3, 11, 0, 0), DT(2012, 3, 14, 0, 0))
6940        test(DT(2012, 3, 11, 1, 0), DT(2012, 3, 14, 0, 0))
6941
6942        test(DT(2012, 11, 1, 23, 59, 59), DT(2012, 11, 4, 0, 0))
6943        test(DT(2012, 11, 2, 0, 0), DT(2012, 11, 5, 0, 0))
6944        test(DT(2012, 11, 2, 1, 0), DT(2012, 11, 5, 0, 0))
6945        test(DT(2012, 11, 3, 23, 59, 59), DT(2012, 11, 6, 0, 0))
6946        test(DT(2012, 11, 4, 0, 0), DT(2012, 11, 7, 0, 0))
6947        test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 7, 0, 0))
6948
6949        fh.close()
6950
6951        fh = logging.handlers.TimedRotatingFileHandler(
6952            self.fn, encoding="utf-8", when='MIDNIGHT', utc=False, interval=3,
6953            atTime=datetime.time(12, 0, 0))
6954
6955        test(DT(2012, 3, 8, 11, 59, 59), DT(2012, 3, 10, 12, 0))
6956        test(DT(2012, 3, 8, 12, 0), DT(2012, 3, 11, 12, 0))
6957        test(DT(2012, 3, 8, 13, 0), DT(2012, 3, 11, 12, 0))
6958        test(DT(2012, 3, 10, 11, 59, 59), DT(2012, 3, 12, 12, 0))
6959        test(DT(2012, 3, 10, 12, 0), DT(2012, 3, 13, 12, 0))
6960        test(DT(2012, 3, 10, 13, 0), DT(2012, 3, 13, 12, 0))
6961
6962        test(DT(2012, 11, 1, 11, 59, 59), DT(2012, 11, 3, 12, 0))
6963        test(DT(2012, 11, 1, 12, 0), DT(2012, 11, 4, 12, 0))
6964        test(DT(2012, 11, 1, 13, 0), DT(2012, 11, 4, 12, 0))
6965        test(DT(2012, 11, 3, 11, 59, 59), DT(2012, 11, 5, 12, 0))
6966        test(DT(2012, 11, 3, 12, 0), DT(2012, 11, 6, 12, 0))
6967        test(DT(2012, 11, 3, 13, 0), DT(2012, 11, 6, 12, 0))
6968
6969        fh.close()
6970
6971    # Run with US-style DST rules: DST begins 2 a.m. on second Sunday in
6972    # March (M3.2.0) and ends 2 a.m. on first Sunday in November (M11.1.0).
6973    @support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0')
6974    def test_compute_rollover_W6_local_interval(self):
6975        # DST begins at 2012-3-11T02:00:00 and ends at 2012-11-4T02:00:00.
6976        DT = datetime.datetime
6977        def test(current, expected):
6978            actual = fh.computeRollover(current.timestamp())
6979            diff = actual - expected.timestamp()
6980            if diff:
6981                self.assertEqual(diff, 0, datetime.timedelta(seconds=diff))
6982
6983        fh = logging.handlers.TimedRotatingFileHandler(
6984            self.fn, encoding="utf-8", when='W6', utc=False, interval=3)
6985
6986        test(DT(2012, 2, 19, 23, 59, 59), DT(2012, 3, 5, 0, 0))
6987        test(DT(2012, 2, 20, 0, 0), DT(2012, 3, 12, 0, 0))
6988        test(DT(2012, 2, 20, 1, 0), DT(2012, 3, 12, 0, 0))
6989        test(DT(2012, 3, 4, 23, 59, 59), DT(2012, 3, 19, 0, 0))
6990        test(DT(2012, 3, 5, 0, 0), DT(2012, 3, 26, 0, 0))
6991        test(DT(2012, 3, 5, 1, 0), DT(2012, 3, 26, 0, 0))
6992
6993        test(DT(2012, 10, 14, 23, 59, 59), DT(2012, 10, 29, 0, 0))
6994        test(DT(2012, 10, 15, 0, 0), DT(2012, 11, 5, 0, 0))
6995        test(DT(2012, 10, 15, 1, 0), DT(2012, 11, 5, 0, 0))
6996        test(DT(2012, 10, 28, 23, 59, 59), DT(2012, 11, 12, 0, 0))
6997        test(DT(2012, 10, 29, 0, 0), DT(2012, 11, 19, 0, 0))
6998        test(DT(2012, 10, 29, 1, 0), DT(2012, 11, 19, 0, 0))
6999
7000        fh.close()
7001
7002        fh = logging.handlers.TimedRotatingFileHandler(
7003            self.fn, encoding="utf-8", when='W6', utc=False, interval=3,
7004            atTime=datetime.time(0, 0, 0))
7005
7006        test(DT(2012, 2, 25, 23, 59, 59), DT(2012, 3, 11, 0, 0))
7007        test(DT(2012, 2, 26, 0, 0), DT(2012, 3, 18, 0, 0))
7008        test(DT(2012, 2, 26, 1, 0), DT(2012, 3, 18, 0, 0))
7009        test(DT(2012, 3, 10, 23, 59, 59), DT(2012, 3, 25, 0, 0))
7010        test(DT(2012, 3, 11, 0, 0), DT(2012, 4, 1, 0, 0))
7011        test(DT(2012, 3, 11, 1, 0), DT(2012, 4, 1, 0, 0))
7012
7013        test(DT(2012, 10, 20, 23, 59, 59), DT(2012, 11, 4, 0, 0))
7014        test(DT(2012, 10, 21, 0, 0), DT(2012, 11, 11, 0, 0))
7015        test(DT(2012, 10, 21, 1, 0), DT(2012, 11, 11, 0, 0))
7016        test(DT(2012, 11, 3, 23, 59, 59), DT(2012, 11, 18, 0, 0))
7017        test(DT(2012, 11, 4, 0, 0), DT(2012, 11, 25, 0, 0))
7018        test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 25, 0, 0))
7019
7020        fh.close()
7021
7022        fh = logging.handlers.TimedRotatingFileHandler(
7023            self.fn, encoding="utf-8", when='W6', utc=False, interval=3,
7024            atTime=datetime.time(12, 0, 0))
7025
7026        test(DT(2012, 2, 18, 11, 59, 59), DT(2012, 3, 4, 12, 0))
7027        test(DT(2012, 2, 19, 12, 0), DT(2012, 3, 11, 12, 0))
7028        test(DT(2012, 2, 19, 13, 0), DT(2012, 3, 11, 12, 0))
7029        test(DT(2012, 3, 4, 11, 59, 59), DT(2012, 3, 18, 12, 0))
7030        test(DT(2012, 3, 4, 12, 0), DT(2012, 3, 25, 12, 0))
7031        test(DT(2012, 3, 4, 13, 0), DT(2012, 3, 25, 12, 0))
7032
7033        test(DT(2012, 10, 14, 11, 59, 59), DT(2012, 10, 28, 12, 0))
7034        test(DT(2012, 10, 14, 12, 0), DT(2012, 11, 4, 12, 0))
7035        test(DT(2012, 10, 14, 13, 0), DT(2012, 11, 4, 12, 0))
7036        test(DT(2012, 10, 28, 11, 59, 59), DT(2012, 11, 11, 12, 0))
7037        test(DT(2012, 10, 28, 12, 0), DT(2012, 11, 18, 12, 0))
7038        test(DT(2012, 10, 28, 13, 0), DT(2012, 11, 18, 12, 0))
7039
7040        fh.close()
7041
7042
7043def secs(**kw):
7044    return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
7045
7046for when, exp in (('S', 1),
7047                  ('M', 60),
7048                  ('H', 60 * 60),
7049                  ('D', 60 * 60 * 24),
7050                  ('MIDNIGHT', 60 * 60 * 24),
7051                  # current time (epoch start) is a Thursday, W0 means Monday
7052                  ('W0', secs(days=4, hours=24)),
7053                 ):
7054    for interval in 1, 3:
7055        def test_compute_rollover(self, when=when, interval=interval, exp=exp):
7056            rh = logging.handlers.TimedRotatingFileHandler(
7057                self.fn, encoding="utf-8", when=when, interval=interval, backupCount=0, utc=True)
7058            currentTime = 0.0
7059            actual = rh.computeRollover(currentTime)
7060            if when.startswith('W'):
7061                exp += secs(days=7*(interval-1))
7062            else:
7063                exp *= interval
7064            if exp != actual:
7065                # Failures occur on some systems for MIDNIGHT and W0.
7066                # Print detailed calculation for MIDNIGHT so we can try to see
7067                # what's going on
7068                if when == 'MIDNIGHT':
7069                    try:
7070                        if rh.utc:
7071                            t = time.gmtime(currentTime)
7072                        else:
7073                            t = time.localtime(currentTime)
7074                        currentHour = t[3]
7075                        currentMinute = t[4]
7076                        currentSecond = t[5]
7077                        # r is the number of seconds left between now and midnight
7078                        r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
7079                                                        currentMinute) * 60 +
7080                                currentSecond)
7081                        result = currentTime + r
7082                        print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
7083                        print('currentHour: %s' % currentHour, file=sys.stderr)
7084                        print('currentMinute: %s' % currentMinute, file=sys.stderr)
7085                        print('currentSecond: %s' % currentSecond, file=sys.stderr)
7086                        print('r: %s' % r, file=sys.stderr)
7087                        print('result: %s' % result, file=sys.stderr)
7088                    except Exception as e:
7089                        print('exception in diagnostic code: %s' % e, file=sys.stderr)
7090            self.assertEqual(exp, actual)
7091            rh.close()
7092        name = "test_compute_rollover_%s" % when
7093        if interval > 1:
7094            name += "_interval"
7095        test_compute_rollover.__name__ = name
7096        setattr(TimedRotatingFileHandlerTest, name, test_compute_rollover)
7097
7098
7099@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
7100class NTEventLogHandlerTest(BaseTest):
7101    def test_basic(self):
7102        logtype = 'Application'
7103        elh = win32evtlog.OpenEventLog(None, logtype)
7104        num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
7105
7106        try:
7107            h = logging.handlers.NTEventLogHandler('test_logging')
7108        except pywintypes.error as e:
7109            if e.winerror == 5:  # access denied
7110                raise unittest.SkipTest('Insufficient privileges to run test')
7111            raise
7112
7113        r = logging.makeLogRecord({'msg': 'Test Log Message'})
7114        h.handle(r)
7115        h.close()
7116        # Now see if the event is recorded
7117        self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
7118        flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
7119                win32evtlog.EVENTLOG_SEQUENTIAL_READ
7120        found = False
7121        GO_BACK = 100
7122        events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
7123        for e in events:
7124            if e.SourceName != 'test_logging':
7125                continue
7126            msg = win32evtlogutil.SafeFormatMessage(e, logtype)
7127            if msg != 'Test Log Message\r\n':
7128                continue
7129            found = True
7130            break
7131        msg = 'Record not found in event log, went back %d records' % GO_BACK
7132        self.assertTrue(found, msg=msg)
7133
7134
7135class MiscTestCase(unittest.TestCase):
7136    def test__all__(self):
7137        not_exported = {
7138            'logThreads', 'logMultiprocessing', 'logProcesses', 'currentframe',
7139            'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle',
7140            'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 'root',
7141            'threading', 'logAsyncioTasks'}
7142        support.check__all__(self, logging, not_exported=not_exported)
7143
7144
7145# Set the locale to the platform-dependent default.  I have no idea
7146# why the test does this, but in any case we save the current locale
7147# first and restore it at the end.
7148def setUpModule():
7149    unittest.enterModuleContext(support.run_with_locale('LC_ALL', ''))
7150
7151
7152if __name__ == "__main__":
7153    unittest.main()
7154