1# Copyright 2001-2022 by Vinay Sajip. All Rights Reserved. 2# 3# Permission to use, copy, modify, and distribute this software and its 4# documentation for any purpose and without fee is hereby granted, 5# provided that the above copyright notice appear in all copies and that 6# both that copyright notice and this permission notice appear in 7# supporting documentation, and that the name of Vinay Sajip 8# not be used in advertising or publicity pertaining to distribution 9# of the software without specific, written prior permission. 10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING 11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR 13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER 14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 17"""Test harness for the logging module. Run all tests. 18 19Copyright (C) 2001-2022 Vinay Sajip. All Rights Reserved. 20""" 21import logging 22import logging.handlers 23import logging.config 24 25import codecs 26import configparser 27import copy 28import datetime 29import pathlib 30import pickle 31import io 32import itertools 33import gc 34import json 35import os 36import queue 37import random 38import re 39import shutil 40import socket 41import struct 42import sys 43import tempfile 44from test.support.script_helper import assert_python_ok, assert_python_failure 45from test import support 46from test.support import import_helper 47from test.support import os_helper 48from test.support import socket_helper 49from test.support import threading_helper 50from test.support import warnings_helper 51from test.support import asyncore 52from test.support import smtpd 53from test.support.logging_helper import TestHandler 54import textwrap 55import threading 56import asyncio 57import time 58import unittest 59import warnings 60import weakref 61 62from http.server import HTTPServer, BaseHTTPRequestHandler 63from unittest.mock import patch 64from urllib.parse import urlparse, parse_qs 65from socketserver import (ThreadingUDPServer, DatagramRequestHandler, 66 ThreadingTCPServer, StreamRequestHandler) 67 68try: 69 import win32evtlog, win32evtlogutil, pywintypes 70except ImportError: 71 win32evtlog = win32evtlogutil = pywintypes = None 72 73try: 74 import zlib 75except ImportError: 76 pass 77 78 79# gh-89363: Skip fork() test if Python is built with Address Sanitizer (ASAN) 80# to work around a libasan race condition, dead lock in pthread_create(). 81skip_if_asan_fork = unittest.skipIf( 82 support.HAVE_ASAN_FORK_BUG, 83 "libasan has a pthread_create() dead lock related to thread+fork") 84skip_if_tsan_fork = unittest.skipIf( 85 support.check_sanitizer(thread=True), 86 "TSAN doesn't support threads after fork") 87 88 89class BaseTest(unittest.TestCase): 90 91 """Base class for logging tests.""" 92 93 log_format = "%(name)s -> %(levelname)s: %(message)s" 94 expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" 95 message_num = 0 96 97 def setUp(self): 98 """Setup the default logging stream to an internal StringIO instance, 99 so that we can examine log output as we want.""" 100 self._threading_key = threading_helper.threading_setup() 101 102 logger_dict = logging.getLogger().manager.loggerDict 103 with logging._lock: 104 self.saved_handlers = logging._handlers.copy() 105 self.saved_handler_list = logging._handlerList[:] 106 self.saved_loggers = saved_loggers = logger_dict.copy() 107 self.saved_name_to_level = logging._nameToLevel.copy() 108 self.saved_level_to_name = logging._levelToName.copy() 109 self.logger_states = logger_states = {} 110 for name in saved_loggers: 111 logger_states[name] = getattr(saved_loggers[name], 112 'disabled', None) 113 114 # Set two unused loggers 115 self.logger1 = logging.getLogger("\xab\xd7\xbb") 116 self.logger2 = logging.getLogger("\u013f\u00d6\u0047") 117 118 self.root_logger = logging.getLogger("") 119 self.original_logging_level = self.root_logger.getEffectiveLevel() 120 121 self.stream = io.StringIO() 122 self.root_logger.setLevel(logging.DEBUG) 123 self.root_hdlr = logging.StreamHandler(self.stream) 124 self.root_formatter = logging.Formatter(self.log_format) 125 self.root_hdlr.setFormatter(self.root_formatter) 126 if self.logger1.hasHandlers(): 127 hlist = self.logger1.handlers + self.root_logger.handlers 128 raise AssertionError('Unexpected handlers: %s' % hlist) 129 if self.logger2.hasHandlers(): 130 hlist = self.logger2.handlers + self.root_logger.handlers 131 raise AssertionError('Unexpected handlers: %s' % hlist) 132 self.root_logger.addHandler(self.root_hdlr) 133 self.assertTrue(self.logger1.hasHandlers()) 134 self.assertTrue(self.logger2.hasHandlers()) 135 136 def tearDown(self): 137 """Remove our logging stream, and restore the original logging 138 level.""" 139 self.stream.close() 140 self.root_logger.removeHandler(self.root_hdlr) 141 while self.root_logger.handlers: 142 h = self.root_logger.handlers[0] 143 self.root_logger.removeHandler(h) 144 h.close() 145 self.root_logger.setLevel(self.original_logging_level) 146 with logging._lock: 147 logging._levelToName.clear() 148 logging._levelToName.update(self.saved_level_to_name) 149 logging._nameToLevel.clear() 150 logging._nameToLevel.update(self.saved_name_to_level) 151 logging._handlers.clear() 152 logging._handlers.update(self.saved_handlers) 153 logging._handlerList[:] = self.saved_handler_list 154 manager = logging.getLogger().manager 155 manager.disable = 0 156 loggerDict = manager.loggerDict 157 loggerDict.clear() 158 loggerDict.update(self.saved_loggers) 159 logger_states = self.logger_states 160 for name in self.logger_states: 161 if logger_states[name] is not None: 162 self.saved_loggers[name].disabled = logger_states[name] 163 164 self.doCleanups() 165 threading_helper.threading_cleanup(*self._threading_key) 166 167 def assert_log_lines(self, expected_values, stream=None, pat=None): 168 """Match the collected log lines against the regular expression 169 self.expected_log_pat, and compare the extracted group values to 170 the expected_values list of tuples.""" 171 stream = stream or self.stream 172 pat = re.compile(pat or self.expected_log_pat) 173 actual_lines = stream.getvalue().splitlines() 174 self.assertEqual(len(actual_lines), len(expected_values)) 175 for actual, expected in zip(actual_lines, expected_values): 176 match = pat.search(actual) 177 if not match: 178 self.fail("Log line does not match expected pattern:\n" + 179 actual) 180 self.assertEqual(tuple(match.groups()), expected) 181 s = stream.read() 182 if s: 183 self.fail("Remaining output at end of log stream:\n" + s) 184 185 def next_message(self): 186 """Generate a message consisting solely of an auto-incrementing 187 integer.""" 188 self.message_num += 1 189 return "%d" % self.message_num 190 191 192class BuiltinLevelsTest(BaseTest): 193 """Test builtin levels and their inheritance.""" 194 195 def test_flat(self): 196 # Logging levels in a flat logger namespace. 197 m = self.next_message 198 199 ERR = logging.getLogger("ERR") 200 ERR.setLevel(logging.ERROR) 201 INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) 202 INF.setLevel(logging.INFO) 203 DEB = logging.getLogger("DEB") 204 DEB.setLevel(logging.DEBUG) 205 206 # These should log. 207 ERR.log(logging.CRITICAL, m()) 208 ERR.error(m()) 209 210 INF.log(logging.CRITICAL, m()) 211 INF.error(m()) 212 INF.warning(m()) 213 INF.info(m()) 214 215 DEB.log(logging.CRITICAL, m()) 216 DEB.error(m()) 217 DEB.warning(m()) 218 DEB.info(m()) 219 DEB.debug(m()) 220 221 # These should not log. 222 ERR.warning(m()) 223 ERR.info(m()) 224 ERR.debug(m()) 225 226 INF.debug(m()) 227 228 self.assert_log_lines([ 229 ('ERR', 'CRITICAL', '1'), 230 ('ERR', 'ERROR', '2'), 231 ('INF', 'CRITICAL', '3'), 232 ('INF', 'ERROR', '4'), 233 ('INF', 'WARNING', '5'), 234 ('INF', 'INFO', '6'), 235 ('DEB', 'CRITICAL', '7'), 236 ('DEB', 'ERROR', '8'), 237 ('DEB', 'WARNING', '9'), 238 ('DEB', 'INFO', '10'), 239 ('DEB', 'DEBUG', '11'), 240 ]) 241 242 def test_nested_explicit(self): 243 # Logging levels in a nested namespace, all explicitly set. 244 m = self.next_message 245 246 INF = logging.getLogger("INF") 247 INF.setLevel(logging.INFO) 248 INF_ERR = logging.getLogger("INF.ERR") 249 INF_ERR.setLevel(logging.ERROR) 250 251 # These should log. 252 INF_ERR.log(logging.CRITICAL, m()) 253 INF_ERR.error(m()) 254 255 # These should not log. 256 INF_ERR.warning(m()) 257 INF_ERR.info(m()) 258 INF_ERR.debug(m()) 259 260 self.assert_log_lines([ 261 ('INF.ERR', 'CRITICAL', '1'), 262 ('INF.ERR', 'ERROR', '2'), 263 ]) 264 265 def test_nested_inherited(self): 266 # Logging levels in a nested namespace, inherited from parent loggers. 267 m = self.next_message 268 269 INF = logging.getLogger("INF") 270 INF.setLevel(logging.INFO) 271 INF_ERR = logging.getLogger("INF.ERR") 272 INF_ERR.setLevel(logging.ERROR) 273 INF_UNDEF = logging.getLogger("INF.UNDEF") 274 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") 275 UNDEF = logging.getLogger("UNDEF") 276 277 # These should log. 278 INF_UNDEF.log(logging.CRITICAL, m()) 279 INF_UNDEF.error(m()) 280 INF_UNDEF.warning(m()) 281 INF_UNDEF.info(m()) 282 INF_ERR_UNDEF.log(logging.CRITICAL, m()) 283 INF_ERR_UNDEF.error(m()) 284 285 # These should not log. 286 INF_UNDEF.debug(m()) 287 INF_ERR_UNDEF.warning(m()) 288 INF_ERR_UNDEF.info(m()) 289 INF_ERR_UNDEF.debug(m()) 290 291 self.assert_log_lines([ 292 ('INF.UNDEF', 'CRITICAL', '1'), 293 ('INF.UNDEF', 'ERROR', '2'), 294 ('INF.UNDEF', 'WARNING', '3'), 295 ('INF.UNDEF', 'INFO', '4'), 296 ('INF.ERR.UNDEF', 'CRITICAL', '5'), 297 ('INF.ERR.UNDEF', 'ERROR', '6'), 298 ]) 299 300 def test_nested_with_virtual_parent(self): 301 # Logging levels when some parent does not exist yet. 302 m = self.next_message 303 304 INF = logging.getLogger("INF") 305 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") 306 CHILD = logging.getLogger("INF.BADPARENT") 307 INF.setLevel(logging.INFO) 308 309 # These should log. 310 GRANDCHILD.log(logging.FATAL, m()) 311 GRANDCHILD.info(m()) 312 CHILD.log(logging.FATAL, m()) 313 CHILD.info(m()) 314 315 # These should not log. 316 GRANDCHILD.debug(m()) 317 CHILD.debug(m()) 318 319 self.assert_log_lines([ 320 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), 321 ('INF.BADPARENT.UNDEF', 'INFO', '2'), 322 ('INF.BADPARENT', 'CRITICAL', '3'), 323 ('INF.BADPARENT', 'INFO', '4'), 324 ]) 325 326 def test_regression_22386(self): 327 """See issue #22386 for more information.""" 328 self.assertEqual(logging.getLevelName('INFO'), logging.INFO) 329 self.assertEqual(logging.getLevelName(logging.INFO), 'INFO') 330 331 def test_issue27935(self): 332 fatal = logging.getLevelName('FATAL') 333 self.assertEqual(fatal, logging.FATAL) 334 335 def test_regression_29220(self): 336 """See issue #29220 for more information.""" 337 logging.addLevelName(logging.INFO, '') 338 self.addCleanup(logging.addLevelName, logging.INFO, 'INFO') 339 self.assertEqual(logging.getLevelName(logging.INFO), '') 340 self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET') 341 self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET) 342 343class BasicFilterTest(BaseTest): 344 345 """Test the bundled Filter class.""" 346 347 def test_filter(self): 348 # Only messages satisfying the specified criteria pass through the 349 # filter. 350 filter_ = logging.Filter("spam.eggs") 351 handler = self.root_logger.handlers[0] 352 try: 353 handler.addFilter(filter_) 354 spam = logging.getLogger("spam") 355 spam_eggs = logging.getLogger("spam.eggs") 356 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 357 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 358 359 spam.info(self.next_message()) 360 spam_eggs.info(self.next_message()) # Good. 361 spam_eggs_fish.info(self.next_message()) # Good. 362 spam_bakedbeans.info(self.next_message()) 363 364 self.assert_log_lines([ 365 ('spam.eggs', 'INFO', '2'), 366 ('spam.eggs.fish', 'INFO', '3'), 367 ]) 368 finally: 369 handler.removeFilter(filter_) 370 371 def test_callable_filter(self): 372 # Only messages satisfying the specified criteria pass through the 373 # filter. 374 375 def filterfunc(record): 376 parts = record.name.split('.') 377 prefix = '.'.join(parts[:2]) 378 return prefix == 'spam.eggs' 379 380 handler = self.root_logger.handlers[0] 381 try: 382 handler.addFilter(filterfunc) 383 spam = logging.getLogger("spam") 384 spam_eggs = logging.getLogger("spam.eggs") 385 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 386 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 387 388 spam.info(self.next_message()) 389 spam_eggs.info(self.next_message()) # Good. 390 spam_eggs_fish.info(self.next_message()) # Good. 391 spam_bakedbeans.info(self.next_message()) 392 393 self.assert_log_lines([ 394 ('spam.eggs', 'INFO', '2'), 395 ('spam.eggs.fish', 'INFO', '3'), 396 ]) 397 finally: 398 handler.removeFilter(filterfunc) 399 400 def test_empty_filter(self): 401 f = logging.Filter() 402 r = logging.makeLogRecord({'name': 'spam.eggs'}) 403 self.assertTrue(f.filter(r)) 404 405# 406# First, we define our levels. There can be as many as you want - the only 407# limitations are that they should be integers, the lowest should be > 0 and 408# larger values mean less information being logged. If you need specific 409# level values which do not fit into these limitations, you can use a 410# mapping dictionary to convert between your application levels and the 411# logging system. 412# 413SILENT = 120 414TACITURN = 119 415TERSE = 118 416EFFUSIVE = 117 417SOCIABLE = 116 418VERBOSE = 115 419TALKATIVE = 114 420GARRULOUS = 113 421CHATTERBOX = 112 422BORING = 111 423 424LEVEL_RANGE = range(BORING, SILENT + 1) 425 426# 427# Next, we define names for our levels. You don't need to do this - in which 428# case the system will use "Level n" to denote the text for the level. 429# 430my_logging_levels = { 431 SILENT : 'Silent', 432 TACITURN : 'Taciturn', 433 TERSE : 'Terse', 434 EFFUSIVE : 'Effusive', 435 SOCIABLE : 'Sociable', 436 VERBOSE : 'Verbose', 437 TALKATIVE : 'Talkative', 438 GARRULOUS : 'Garrulous', 439 CHATTERBOX : 'Chatterbox', 440 BORING : 'Boring', 441} 442 443class GarrulousFilter(logging.Filter): 444 445 """A filter which blocks garrulous messages.""" 446 447 def filter(self, record): 448 return record.levelno != GARRULOUS 449 450class VerySpecificFilter(logging.Filter): 451 452 """A filter which blocks sociable and taciturn messages.""" 453 454 def filter(self, record): 455 return record.levelno not in [SOCIABLE, TACITURN] 456 457 458class CustomLevelsAndFiltersTest(BaseTest): 459 460 """Test various filtering possibilities with custom logging levels.""" 461 462 # Skip the logger name group. 463 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 464 465 def setUp(self): 466 BaseTest.setUp(self) 467 for k, v in my_logging_levels.items(): 468 logging.addLevelName(k, v) 469 470 def log_at_all_levels(self, logger): 471 for lvl in LEVEL_RANGE: 472 logger.log(lvl, self.next_message()) 473 474 def test_handler_filter_replaces_record(self): 475 def replace_message(record: logging.LogRecord): 476 record = copy.copy(record) 477 record.msg = "new message!" 478 return record 479 480 # Set up a logging hierarchy such that "child" and it's handler 481 # (and thus `replace_message()`) always get called before 482 # propagating up to "parent". 483 # Then we can confirm that `replace_message()` was able to 484 # replace the log record without having a side effect on 485 # other loggers or handlers. 486 parent = logging.getLogger("parent") 487 child = logging.getLogger("parent.child") 488 stream_1 = io.StringIO() 489 stream_2 = io.StringIO() 490 handler_1 = logging.StreamHandler(stream_1) 491 handler_2 = logging.StreamHandler(stream_2) 492 handler_2.addFilter(replace_message) 493 parent.addHandler(handler_1) 494 child.addHandler(handler_2) 495 496 child.info("original message") 497 handler_1.flush() 498 handler_2.flush() 499 self.assertEqual(stream_1.getvalue(), "original message\n") 500 self.assertEqual(stream_2.getvalue(), "new message!\n") 501 502 def test_logging_filter_replaces_record(self): 503 records = set() 504 505 class RecordingFilter(logging.Filter): 506 def filter(self, record: logging.LogRecord): 507 records.add(id(record)) 508 return copy.copy(record) 509 510 logger = logging.getLogger("logger") 511 logger.setLevel(logging.INFO) 512 logger.addFilter(RecordingFilter()) 513 logger.addFilter(RecordingFilter()) 514 515 logger.info("msg") 516 517 self.assertEqual(2, len(records)) 518 519 def test_logger_filter(self): 520 # Filter at logger level. 521 self.root_logger.setLevel(VERBOSE) 522 # Levels >= 'Verbose' are good. 523 self.log_at_all_levels(self.root_logger) 524 self.assert_log_lines([ 525 ('Verbose', '5'), 526 ('Sociable', '6'), 527 ('Effusive', '7'), 528 ('Terse', '8'), 529 ('Taciturn', '9'), 530 ('Silent', '10'), 531 ]) 532 533 def test_handler_filter(self): 534 # Filter at handler level. 535 self.root_logger.handlers[0].setLevel(SOCIABLE) 536 try: 537 # Levels >= 'Sociable' are good. 538 self.log_at_all_levels(self.root_logger) 539 self.assert_log_lines([ 540 ('Sociable', '6'), 541 ('Effusive', '7'), 542 ('Terse', '8'), 543 ('Taciturn', '9'), 544 ('Silent', '10'), 545 ]) 546 finally: 547 self.root_logger.handlers[0].setLevel(logging.NOTSET) 548 549 def test_specific_filters(self): 550 # Set a specific filter object on the handler, and then add another 551 # filter object on the logger itself. 552 handler = self.root_logger.handlers[0] 553 specific_filter = None 554 garr = GarrulousFilter() 555 handler.addFilter(garr) 556 try: 557 self.log_at_all_levels(self.root_logger) 558 first_lines = [ 559 # Notice how 'Garrulous' is missing 560 ('Boring', '1'), 561 ('Chatterbox', '2'), 562 ('Talkative', '4'), 563 ('Verbose', '5'), 564 ('Sociable', '6'), 565 ('Effusive', '7'), 566 ('Terse', '8'), 567 ('Taciturn', '9'), 568 ('Silent', '10'), 569 ] 570 self.assert_log_lines(first_lines) 571 572 specific_filter = VerySpecificFilter() 573 self.root_logger.addFilter(specific_filter) 574 self.log_at_all_levels(self.root_logger) 575 self.assert_log_lines(first_lines + [ 576 # Not only 'Garrulous' is still missing, but also 'Sociable' 577 # and 'Taciturn' 578 ('Boring', '11'), 579 ('Chatterbox', '12'), 580 ('Talkative', '14'), 581 ('Verbose', '15'), 582 ('Effusive', '17'), 583 ('Terse', '18'), 584 ('Silent', '20'), 585 ]) 586 finally: 587 if specific_filter: 588 self.root_logger.removeFilter(specific_filter) 589 handler.removeFilter(garr) 590 591 592def make_temp_file(*args, **kwargs): 593 fd, fn = tempfile.mkstemp(*args, **kwargs) 594 os.close(fd) 595 return fn 596 597 598class HandlerTest(BaseTest): 599 def test_name(self): 600 h = logging.Handler() 601 h.name = 'generic' 602 self.assertEqual(h.name, 'generic') 603 h.name = 'anothergeneric' 604 self.assertEqual(h.name, 'anothergeneric') 605 self.assertRaises(NotImplementedError, h.emit, None) 606 607 def test_builtin_handlers(self): 608 # We can't actually *use* too many handlers in the tests, 609 # but we can try instantiating them with various options 610 if sys.platform in ('linux', 'android', 'darwin'): 611 for existing in (True, False): 612 fn = make_temp_file() 613 if not existing: 614 os.unlink(fn) 615 h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=True) 616 if existing: 617 dev, ino = h.dev, h.ino 618 self.assertEqual(dev, -1) 619 self.assertEqual(ino, -1) 620 r = logging.makeLogRecord({'msg': 'Test'}) 621 h.handle(r) 622 # Now remove the file. 623 os.unlink(fn) 624 self.assertFalse(os.path.exists(fn)) 625 # The next call should recreate the file. 626 h.handle(r) 627 self.assertTrue(os.path.exists(fn)) 628 else: 629 self.assertEqual(h.dev, -1) 630 self.assertEqual(h.ino, -1) 631 h.close() 632 if existing: 633 os.unlink(fn) 634 if sys.platform == 'darwin': 635 sockname = '/var/run/syslog' 636 else: 637 sockname = '/dev/log' 638 try: 639 h = logging.handlers.SysLogHandler(sockname) 640 self.assertEqual(h.facility, h.LOG_USER) 641 self.assertTrue(h.unixsocket) 642 h.close() 643 except OSError: # syslogd might not be available 644 pass 645 for method in ('GET', 'POST', 'PUT'): 646 if method == 'PUT': 647 self.assertRaises(ValueError, logging.handlers.HTTPHandler, 648 'localhost', '/log', method) 649 else: 650 h = logging.handlers.HTTPHandler('localhost', '/log', method) 651 h.close() 652 h = logging.handlers.BufferingHandler(0) 653 r = logging.makeLogRecord({}) 654 self.assertTrue(h.shouldFlush(r)) 655 h.close() 656 h = logging.handlers.BufferingHandler(1) 657 self.assertFalse(h.shouldFlush(r)) 658 h.close() 659 660 def test_pathlike_objects(self): 661 """ 662 Test that path-like objects are accepted as filename arguments to handlers. 663 664 See Issue #27493. 665 """ 666 fn = make_temp_file() 667 os.unlink(fn) 668 pfn = os_helper.FakePath(fn) 669 cases = ( 670 (logging.FileHandler, (pfn, 'w')), 671 (logging.handlers.RotatingFileHandler, (pfn, 'a')), 672 (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')), 673 ) 674 if sys.platform in ('linux', 'android', 'darwin'): 675 cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),) 676 for cls, args in cases: 677 h = cls(*args, encoding="utf-8") 678 self.assertTrue(os.path.exists(fn)) 679 h.close() 680 os.unlink(fn) 681 682 @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') 683 @unittest.skipIf( 684 support.is_emscripten, "Emscripten cannot fstat unlinked files." 685 ) 686 @threading_helper.requires_working_threading() 687 @support.requires_resource('walltime') 688 def test_race(self): 689 # Issue #14632 refers. 690 def remove_loop(fname, tries): 691 for _ in range(tries): 692 try: 693 os.unlink(fname) 694 self.deletion_time = time.time() 695 except OSError: 696 pass 697 time.sleep(0.004 * random.randint(0, 4)) 698 699 del_count = 500 700 log_count = 500 701 702 self.handle_time = None 703 self.deletion_time = None 704 705 for delay in (False, True): 706 fn = make_temp_file('.log', 'test_logging-3-') 707 remover = threading.Thread(target=remove_loop, args=(fn, del_count)) 708 remover.daemon = True 709 remover.start() 710 h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=delay) 711 f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') 712 h.setFormatter(f) 713 try: 714 for _ in range(log_count): 715 time.sleep(0.005) 716 r = logging.makeLogRecord({'msg': 'testing' }) 717 try: 718 self.handle_time = time.time() 719 h.handle(r) 720 except Exception: 721 print('Deleted at %s, ' 722 'opened at %s' % (self.deletion_time, 723 self.handle_time)) 724 raise 725 finally: 726 remover.join() 727 h.close() 728 if os.path.exists(fn): 729 os.unlink(fn) 730 731 # The implementation relies on os.register_at_fork existing, but we test 732 # based on os.fork existing because that is what users and this test use. 733 # This helps ensure that when fork exists (the important concept) that the 734 # register_at_fork mechanism is also present and used. 735 @support.requires_fork() 736 @threading_helper.requires_working_threading() 737 @skip_if_asan_fork 738 @skip_if_tsan_fork 739 def test_post_fork_child_no_deadlock(self): 740 """Ensure child logging locks are not held; bpo-6721 & bpo-36533.""" 741 class _OurHandler(logging.Handler): 742 def __init__(self): 743 super().__init__() 744 self.sub_handler = logging.StreamHandler( 745 stream=open('/dev/null', 'wt', encoding='utf-8')) 746 747 def emit(self, record): 748 with self.sub_handler.lock: 749 self.sub_handler.emit(record) 750 751 self.assertEqual(len(logging._handlers), 0) 752 refed_h = _OurHandler() 753 self.addCleanup(refed_h.sub_handler.stream.close) 754 refed_h.name = 'because we need at least one for this test' 755 self.assertGreater(len(logging._handlers), 0) 756 self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1) 757 test_logger = logging.getLogger('test_post_fork_child_no_deadlock') 758 test_logger.addHandler(refed_h) 759 test_logger.setLevel(logging.DEBUG) 760 761 locks_held__ready_to_fork = threading.Event() 762 fork_happened__release_locks_and_end_thread = threading.Event() 763 764 def lock_holder_thread_fn(): 765 with logging._lock, refed_h.lock: 766 # Tell the main thread to do the fork. 767 locks_held__ready_to_fork.set() 768 769 # If the deadlock bug exists, the fork will happen 770 # without dealing with the locks we hold, deadlocking 771 # the child. 772 773 # Wait for a successful fork or an unreasonable amount of 774 # time before releasing our locks. To avoid a timing based 775 # test we'd need communication from os.fork() as to when it 776 # has actually happened. Given this is a regression test 777 # for a fixed issue, potentially less reliably detecting 778 # regression via timing is acceptable for simplicity. 779 # The test will always take at least this long. :( 780 fork_happened__release_locks_and_end_thread.wait(0.5) 781 782 lock_holder_thread = threading.Thread( 783 target=lock_holder_thread_fn, 784 name='test_post_fork_child_no_deadlock lock holder') 785 lock_holder_thread.start() 786 787 locks_held__ready_to_fork.wait() 788 pid = os.fork() 789 if pid == 0: 790 # Child process 791 try: 792 test_logger.info(r'Child process did not deadlock. \o/') 793 finally: 794 os._exit(0) 795 else: 796 # Parent process 797 test_logger.info(r'Parent process returned from fork. \o/') 798 fork_happened__release_locks_and_end_thread.set() 799 lock_holder_thread.join() 800 801 support.wait_process(pid, exitcode=0) 802 803 804class BadStream(object): 805 def write(self, data): 806 raise RuntimeError('deliberate mistake') 807 808class TestStreamHandler(logging.StreamHandler): 809 def handleError(self, record): 810 self.error_record = record 811 812class StreamWithIntName(object): 813 level = logging.NOTSET 814 name = 2 815 816class StreamHandlerTest(BaseTest): 817 def test_error_handling(self): 818 h = TestStreamHandler(BadStream()) 819 r = logging.makeLogRecord({}) 820 old_raise = logging.raiseExceptions 821 822 try: 823 h.handle(r) 824 self.assertIs(h.error_record, r) 825 826 h = logging.StreamHandler(BadStream()) 827 with support.captured_stderr() as stderr: 828 h.handle(r) 829 msg = '\nRuntimeError: deliberate mistake\n' 830 self.assertIn(msg, stderr.getvalue()) 831 832 logging.raiseExceptions = False 833 with support.captured_stderr() as stderr: 834 h.handle(r) 835 self.assertEqual('', stderr.getvalue()) 836 finally: 837 logging.raiseExceptions = old_raise 838 839 def test_stream_setting(self): 840 """ 841 Test setting the handler's stream 842 """ 843 h = logging.StreamHandler() 844 stream = io.StringIO() 845 old = h.setStream(stream) 846 self.assertIs(old, sys.stderr) 847 actual = h.setStream(old) 848 self.assertIs(actual, stream) 849 # test that setting to existing value returns None 850 actual = h.setStream(old) 851 self.assertIsNone(actual) 852 853 def test_can_represent_stream_with_int_name(self): 854 h = logging.StreamHandler(StreamWithIntName()) 855 self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>') 856 857# -- The following section could be moved into a server_helper.py module 858# -- if it proves to be of wider utility than just test_logging 859 860class TestSMTPServer(smtpd.SMTPServer): 861 """ 862 This class implements a test SMTP server. 863 864 :param addr: A (host, port) tuple which the server listens on. 865 You can specify a port value of zero: the server's 866 *port* attribute will hold the actual port number 867 used, which can be used in client connections. 868 :param handler: A callable which will be called to process 869 incoming messages. The handler will be passed 870 the client address tuple, who the message is from, 871 a list of recipients and the message data. 872 :param poll_interval: The interval, in seconds, used in the underlying 873 :func:`select` or :func:`poll` call by 874 :func:`asyncore.loop`. 875 :param sockmap: A dictionary which will be used to hold 876 :class:`asyncore.dispatcher` instances used by 877 :func:`asyncore.loop`. This avoids changing the 878 :mod:`asyncore` module's global state. 879 """ 880 881 def __init__(self, addr, handler, poll_interval, sockmap): 882 smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, 883 decode_data=True) 884 self.port = self.socket.getsockname()[1] 885 self._handler = handler 886 self._thread = None 887 self._quit = False 888 self.poll_interval = poll_interval 889 890 def process_message(self, peer, mailfrom, rcpttos, data): 891 """ 892 Delegates to the handler passed in to the server's constructor. 893 894 Typically, this will be a test case method. 895 :param peer: The client (host, port) tuple. 896 :param mailfrom: The address of the sender. 897 :param rcpttos: The addresses of the recipients. 898 :param data: The message. 899 """ 900 self._handler(peer, mailfrom, rcpttos, data) 901 902 def start(self): 903 """ 904 Start the server running on a separate daemon thread. 905 """ 906 self._thread = t = threading.Thread(target=self.serve_forever, 907 args=(self.poll_interval,)) 908 t.daemon = True 909 t.start() 910 911 def serve_forever(self, poll_interval): 912 """ 913 Run the :mod:`asyncore` loop until normal termination 914 conditions arise. 915 :param poll_interval: The interval, in seconds, used in the underlying 916 :func:`select` or :func:`poll` call by 917 :func:`asyncore.loop`. 918 """ 919 while not self._quit: 920 asyncore.loop(poll_interval, map=self._map, count=1) 921 922 def stop(self): 923 """ 924 Stop the thread by closing the server instance. 925 Wait for the server thread to terminate. 926 """ 927 self._quit = True 928 threading_helper.join_thread(self._thread) 929 self._thread = None 930 self.close() 931 asyncore.close_all(map=self._map, ignore_all=True) 932 933 934class ControlMixin(object): 935 """ 936 This mixin is used to start a server on a separate thread, and 937 shut it down programmatically. Request handling is simplified - instead 938 of needing to derive a suitable RequestHandler subclass, you just 939 provide a callable which will be passed each received request to be 940 processed. 941 942 :param handler: A handler callable which will be called with a 943 single parameter - the request - in order to 944 process the request. This handler is called on the 945 server thread, effectively meaning that requests are 946 processed serially. While not quite web scale ;-), 947 this should be fine for testing applications. 948 :param poll_interval: The polling interval in seconds. 949 """ 950 def __init__(self, handler, poll_interval): 951 self._thread = None 952 self.poll_interval = poll_interval 953 self._handler = handler 954 self.ready = threading.Event() 955 956 def start(self): 957 """ 958 Create a daemon thread to run the server, and start it. 959 """ 960 self._thread = t = threading.Thread(target=self.serve_forever, 961 args=(self.poll_interval,)) 962 t.daemon = True 963 t.start() 964 965 def serve_forever(self, poll_interval): 966 """ 967 Run the server. Set the ready flag before entering the 968 service loop. 969 """ 970 self.ready.set() 971 super(ControlMixin, self).serve_forever(poll_interval) 972 973 def stop(self): 974 """ 975 Tell the server thread to stop, and wait for it to do so. 976 """ 977 self.shutdown() 978 if self._thread is not None: 979 threading_helper.join_thread(self._thread) 980 self._thread = None 981 self.server_close() 982 self.ready.clear() 983 984class TestHTTPServer(ControlMixin, HTTPServer): 985 """ 986 An HTTP server which is controllable using :class:`ControlMixin`. 987 988 :param addr: A tuple with the IP address and port to listen on. 989 :param handler: A handler callable which will be called with a 990 single parameter - the request - in order to 991 process the request. 992 :param poll_interval: The polling interval in seconds. 993 :param log: Pass ``True`` to enable log messages. 994 """ 995 def __init__(self, addr, handler, poll_interval=0.5, 996 log=False, sslctx=None): 997 class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): 998 def __getattr__(self, name, default=None): 999 if name.startswith('do_'): 1000 return self.process_request 1001 raise AttributeError(name) 1002 1003 def process_request(self): 1004 self.server._handler(self) 1005 1006 def log_message(self, format, *args): 1007 if log: 1008 super(DelegatingHTTPRequestHandler, 1009 self).log_message(format, *args) 1010 HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) 1011 ControlMixin.__init__(self, handler, poll_interval) 1012 self.sslctx = sslctx 1013 1014 def get_request(self): 1015 try: 1016 sock, addr = self.socket.accept() 1017 if self.sslctx: 1018 sock = self.sslctx.wrap_socket(sock, server_side=True) 1019 except OSError as e: 1020 # socket errors are silenced by the caller, print them here 1021 sys.stderr.write("Got an error:\n%s\n" % e) 1022 raise 1023 return sock, addr 1024 1025class TestTCPServer(ControlMixin, ThreadingTCPServer): 1026 """ 1027 A TCP server which is controllable using :class:`ControlMixin`. 1028 1029 :param addr: A tuple with the IP address and port to listen on. 1030 :param handler: A handler callable which will be called with a single 1031 parameter - the request - in order to process the request. 1032 :param poll_interval: The polling interval in seconds. 1033 :bind_and_activate: If True (the default), binds the server and starts it 1034 listening. If False, you need to call 1035 :meth:`server_bind` and :meth:`server_activate` at 1036 some later time before calling :meth:`start`, so that 1037 the server will set up the socket and listen on it. 1038 """ 1039 1040 allow_reuse_address = True 1041 1042 def __init__(self, addr, handler, poll_interval=0.5, 1043 bind_and_activate=True): 1044 class DelegatingTCPRequestHandler(StreamRequestHandler): 1045 1046 def handle(self): 1047 self.server._handler(self) 1048 ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, 1049 bind_and_activate) 1050 ControlMixin.__init__(self, handler, poll_interval) 1051 1052 def server_bind(self): 1053 super(TestTCPServer, self).server_bind() 1054 self.port = self.socket.getsockname()[1] 1055 1056class TestUDPServer(ControlMixin, ThreadingUDPServer): 1057 """ 1058 A UDP server which is controllable using :class:`ControlMixin`. 1059 1060 :param addr: A tuple with the IP address and port to listen on. 1061 :param handler: A handler callable which will be called with a 1062 single parameter - the request - in order to 1063 process the request. 1064 :param poll_interval: The polling interval for shutdown requests, 1065 in seconds. 1066 :bind_and_activate: If True (the default), binds the server and 1067 starts it listening. If False, you need to 1068 call :meth:`server_bind` and 1069 :meth:`server_activate` at some later time 1070 before calling :meth:`start`, so that the server will 1071 set up the socket and listen on it. 1072 """ 1073 def __init__(self, addr, handler, poll_interval=0.5, 1074 bind_and_activate=True): 1075 class DelegatingUDPRequestHandler(DatagramRequestHandler): 1076 1077 def handle(self): 1078 self.server._handler(self) 1079 1080 def finish(self): 1081 data = self.wfile.getvalue() 1082 if data: 1083 try: 1084 super(DelegatingUDPRequestHandler, self).finish() 1085 except OSError: 1086 if not self.server._closed: 1087 raise 1088 1089 ThreadingUDPServer.__init__(self, addr, 1090 DelegatingUDPRequestHandler, 1091 bind_and_activate) 1092 ControlMixin.__init__(self, handler, poll_interval) 1093 self._closed = False 1094 1095 def server_bind(self): 1096 super(TestUDPServer, self).server_bind() 1097 self.port = self.socket.getsockname()[1] 1098 1099 def server_close(self): 1100 super(TestUDPServer, self).server_close() 1101 self._closed = True 1102 1103if hasattr(socket, "AF_UNIX"): 1104 class TestUnixStreamServer(TestTCPServer): 1105 address_family = socket.AF_UNIX 1106 1107 class TestUnixDatagramServer(TestUDPServer): 1108 address_family = socket.AF_UNIX 1109 1110# - end of server_helper section 1111 1112@support.requires_working_socket() 1113@threading_helper.requires_working_threading() 1114class SMTPHandlerTest(BaseTest): 1115 # bpo-14314, bpo-19665, bpo-34092: don't wait forever 1116 TIMEOUT = support.LONG_TIMEOUT 1117 1118 def test_basic(self): 1119 sockmap = {} 1120 server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001, 1121 sockmap) 1122 server.start() 1123 addr = (socket_helper.HOST, server.port) 1124 h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', 1125 timeout=self.TIMEOUT) 1126 self.assertEqual(h.toaddrs, ['you']) 1127 self.messages = [] 1128 r = logging.makeLogRecord({'msg': 'Hello \u2713'}) 1129 self.handled = threading.Event() 1130 h.handle(r) 1131 self.handled.wait(self.TIMEOUT) 1132 server.stop() 1133 self.assertTrue(self.handled.is_set()) 1134 self.assertEqual(len(self.messages), 1) 1135 peer, mailfrom, rcpttos, data = self.messages[0] 1136 self.assertEqual(mailfrom, 'me') 1137 self.assertEqual(rcpttos, ['you']) 1138 self.assertIn('\nSubject: Log\n', data) 1139 self.assertTrue(data.endswith('\n\nHello \u2713')) 1140 h.close() 1141 1142 def process_message(self, *args): 1143 self.messages.append(args) 1144 self.handled.set() 1145 1146class MemoryHandlerTest(BaseTest): 1147 1148 """Tests for the MemoryHandler.""" 1149 1150 # Do not bother with a logger name group. 1151 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 1152 1153 def setUp(self): 1154 BaseTest.setUp(self) 1155 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1156 self.root_hdlr) 1157 self.mem_logger = logging.getLogger('mem') 1158 self.mem_logger.propagate = 0 1159 self.mem_logger.addHandler(self.mem_hdlr) 1160 1161 def tearDown(self): 1162 self.mem_hdlr.close() 1163 BaseTest.tearDown(self) 1164 1165 def test_flush(self): 1166 # The memory handler flushes to its target handler based on specific 1167 # criteria (message count and message level). 1168 self.mem_logger.debug(self.next_message()) 1169 self.assert_log_lines([]) 1170 self.mem_logger.info(self.next_message()) 1171 self.assert_log_lines([]) 1172 # This will flush because the level is >= logging.WARNING 1173 self.mem_logger.warning(self.next_message()) 1174 lines = [ 1175 ('DEBUG', '1'), 1176 ('INFO', '2'), 1177 ('WARNING', '3'), 1178 ] 1179 self.assert_log_lines(lines) 1180 for n in (4, 14): 1181 for i in range(9): 1182 self.mem_logger.debug(self.next_message()) 1183 self.assert_log_lines(lines) 1184 # This will flush because it's the 10th message since the last 1185 # flush. 1186 self.mem_logger.debug(self.next_message()) 1187 lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] 1188 self.assert_log_lines(lines) 1189 1190 self.mem_logger.debug(self.next_message()) 1191 self.assert_log_lines(lines) 1192 1193 def test_flush_on_close(self): 1194 """ 1195 Test that the flush-on-close configuration works as expected. 1196 """ 1197 self.mem_logger.debug(self.next_message()) 1198 self.assert_log_lines([]) 1199 self.mem_logger.info(self.next_message()) 1200 self.assert_log_lines([]) 1201 self.mem_logger.removeHandler(self.mem_hdlr) 1202 # Default behaviour is to flush on close. Check that it happens. 1203 self.mem_hdlr.close() 1204 lines = [ 1205 ('DEBUG', '1'), 1206 ('INFO', '2'), 1207 ] 1208 self.assert_log_lines(lines) 1209 # Now configure for flushing not to be done on close. 1210 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1211 self.root_hdlr, 1212 False) 1213 self.mem_logger.addHandler(self.mem_hdlr) 1214 self.mem_logger.debug(self.next_message()) 1215 self.assert_log_lines(lines) # no change 1216 self.mem_logger.info(self.next_message()) 1217 self.assert_log_lines(lines) # no change 1218 self.mem_logger.removeHandler(self.mem_hdlr) 1219 self.mem_hdlr.close() 1220 # assert that no new lines have been added 1221 self.assert_log_lines(lines) # no change 1222 1223 def test_shutdown_flush_on_close(self): 1224 """ 1225 Test that the flush-on-close configuration is respected by the 1226 shutdown method. 1227 """ 1228 self.mem_logger.debug(self.next_message()) 1229 self.assert_log_lines([]) 1230 self.mem_logger.info(self.next_message()) 1231 self.assert_log_lines([]) 1232 # Default behaviour is to flush on close. Check that it happens. 1233 logging.shutdown(handlerList=[logging.weakref.ref(self.mem_hdlr)]) 1234 lines = [ 1235 ('DEBUG', '1'), 1236 ('INFO', '2'), 1237 ] 1238 self.assert_log_lines(lines) 1239 # Now configure for flushing not to be done on close. 1240 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1241 self.root_hdlr, 1242 False) 1243 self.mem_logger.addHandler(self.mem_hdlr) 1244 self.mem_logger.debug(self.next_message()) 1245 self.assert_log_lines(lines) # no change 1246 self.mem_logger.info(self.next_message()) 1247 self.assert_log_lines(lines) # no change 1248 # assert that no new lines have been added after shutdown 1249 logging.shutdown(handlerList=[logging.weakref.ref(self.mem_hdlr)]) 1250 self.assert_log_lines(lines) # no change 1251 1252 @threading_helper.requires_working_threading() 1253 def test_race_between_set_target_and_flush(self): 1254 class MockRaceConditionHandler: 1255 def __init__(self, mem_hdlr): 1256 self.mem_hdlr = mem_hdlr 1257 self.threads = [] 1258 1259 def removeTarget(self): 1260 self.mem_hdlr.setTarget(None) 1261 1262 def handle(self, msg): 1263 thread = threading.Thread(target=self.removeTarget) 1264 self.threads.append(thread) 1265 thread.start() 1266 1267 target = MockRaceConditionHandler(self.mem_hdlr) 1268 try: 1269 self.mem_hdlr.setTarget(target) 1270 1271 for _ in range(10): 1272 time.sleep(0.005) 1273 self.mem_logger.info("not flushed") 1274 self.mem_logger.warning("flushed") 1275 finally: 1276 for thread in target.threads: 1277 threading_helper.join_thread(thread) 1278 1279 1280class ExceptionFormatter(logging.Formatter): 1281 """A special exception formatter.""" 1282 def formatException(self, ei): 1283 return "Got a [%s]" % ei[0].__name__ 1284 1285def closeFileHandler(h, fn): 1286 h.close() 1287 os.remove(fn) 1288 1289class ConfigFileTest(BaseTest): 1290 1291 """Reading logging config from a .ini-style config file.""" 1292 1293 check_no_resource_warning = warnings_helper.check_no_resource_warning 1294 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 1295 1296 # config0 is a standard configuration. 1297 config0 = """ 1298 [loggers] 1299 keys=root 1300 1301 [handlers] 1302 keys=hand1 1303 1304 [formatters] 1305 keys=form1 1306 1307 [logger_root] 1308 level=WARNING 1309 handlers=hand1 1310 1311 [handler_hand1] 1312 class=StreamHandler 1313 level=NOTSET 1314 formatter=form1 1315 args=(sys.stdout,) 1316 1317 [formatter_form1] 1318 format=%(levelname)s ++ %(message)s 1319 datefmt= 1320 """ 1321 1322 # config1 adds a little to the standard configuration. 1323 config1 = """ 1324 [loggers] 1325 keys=root,parser 1326 1327 [handlers] 1328 keys=hand1 1329 1330 [formatters] 1331 keys=form1 1332 1333 [logger_root] 1334 level=WARNING 1335 handlers= 1336 1337 [logger_parser] 1338 level=DEBUG 1339 handlers=hand1 1340 propagate=1 1341 qualname=compiler.parser 1342 1343 [handler_hand1] 1344 class=StreamHandler 1345 level=NOTSET 1346 formatter=form1 1347 args=(sys.stdout,) 1348 1349 [formatter_form1] 1350 format=%(levelname)s ++ %(message)s 1351 datefmt= 1352 """ 1353 1354 # config1a moves the handler to the root. 1355 config1a = """ 1356 [loggers] 1357 keys=root,parser 1358 1359 [handlers] 1360 keys=hand1 1361 1362 [formatters] 1363 keys=form1 1364 1365 [logger_root] 1366 level=WARNING 1367 handlers=hand1 1368 1369 [logger_parser] 1370 level=DEBUG 1371 handlers= 1372 propagate=1 1373 qualname=compiler.parser 1374 1375 [handler_hand1] 1376 class=StreamHandler 1377 level=NOTSET 1378 formatter=form1 1379 args=(sys.stdout,) 1380 1381 [formatter_form1] 1382 format=%(levelname)s ++ %(message)s 1383 datefmt= 1384 """ 1385 1386 # config2 has a subtle configuration error that should be reported 1387 config2 = config1.replace("sys.stdout", "sys.stbout") 1388 1389 # config3 has a less subtle configuration error 1390 config3 = config1.replace("formatter=form1", "formatter=misspelled_name") 1391 1392 # config4 specifies a custom formatter class to be loaded 1393 config4 = """ 1394 [loggers] 1395 keys=root 1396 1397 [handlers] 1398 keys=hand1 1399 1400 [formatters] 1401 keys=form1 1402 1403 [logger_root] 1404 level=NOTSET 1405 handlers=hand1 1406 1407 [handler_hand1] 1408 class=StreamHandler 1409 level=NOTSET 1410 formatter=form1 1411 args=(sys.stdout,) 1412 1413 [formatter_form1] 1414 class=""" + __name__ + """.ExceptionFormatter 1415 format=%(levelname)s:%(name)s:%(message)s 1416 datefmt= 1417 """ 1418 1419 # config5 specifies a custom handler class to be loaded 1420 config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') 1421 1422 # config6 uses ', ' delimiters in the handlers and formatters sections 1423 config6 = """ 1424 [loggers] 1425 keys=root,parser 1426 1427 [handlers] 1428 keys=hand1, hand2 1429 1430 [formatters] 1431 keys=form1, form2 1432 1433 [logger_root] 1434 level=WARNING 1435 handlers= 1436 1437 [logger_parser] 1438 level=DEBUG 1439 handlers=hand1 1440 propagate=1 1441 qualname=compiler.parser 1442 1443 [handler_hand1] 1444 class=StreamHandler 1445 level=NOTSET 1446 formatter=form1 1447 args=(sys.stdout,) 1448 1449 [handler_hand2] 1450 class=StreamHandler 1451 level=NOTSET 1452 formatter=form1 1453 args=(sys.stderr,) 1454 1455 [formatter_form1] 1456 format=%(levelname)s ++ %(message)s 1457 datefmt= 1458 1459 [formatter_form2] 1460 format=%(message)s 1461 datefmt= 1462 """ 1463 1464 # config7 adds a compiler logger, and uses kwargs instead of args. 1465 config7 = """ 1466 [loggers] 1467 keys=root,parser,compiler 1468 1469 [handlers] 1470 keys=hand1 1471 1472 [formatters] 1473 keys=form1 1474 1475 [logger_root] 1476 level=WARNING 1477 handlers=hand1 1478 1479 [logger_compiler] 1480 level=DEBUG 1481 handlers= 1482 propagate=1 1483 qualname=compiler 1484 1485 [logger_parser] 1486 level=DEBUG 1487 handlers= 1488 propagate=1 1489 qualname=compiler.parser 1490 1491 [handler_hand1] 1492 class=StreamHandler 1493 level=NOTSET 1494 formatter=form1 1495 kwargs={'stream': sys.stdout,} 1496 1497 [formatter_form1] 1498 format=%(levelname)s ++ %(message)s 1499 datefmt= 1500 """ 1501 1502 # config 8, check for resource warning 1503 config8 = r""" 1504 [loggers] 1505 keys=root 1506 1507 [handlers] 1508 keys=file 1509 1510 [formatters] 1511 keys= 1512 1513 [logger_root] 1514 level=DEBUG 1515 handlers=file 1516 1517 [handler_file] 1518 class=FileHandler 1519 level=DEBUG 1520 args=("{tempfile}",) 1521 kwargs={{"encoding": "utf-8"}} 1522 """ 1523 1524 1525 config9 = """ 1526 [loggers] 1527 keys=root 1528 1529 [handlers] 1530 keys=hand1 1531 1532 [formatters] 1533 keys=form1 1534 1535 [logger_root] 1536 level=WARNING 1537 handlers=hand1 1538 1539 [handler_hand1] 1540 class=StreamHandler 1541 level=NOTSET 1542 formatter=form1 1543 args=(sys.stdout,) 1544 1545 [formatter_form1] 1546 format=%(message)s ++ %(customfield)s 1547 defaults={"customfield": "defaultvalue"} 1548 """ 1549 1550 disable_test = """ 1551 [loggers] 1552 keys=root 1553 1554 [handlers] 1555 keys=screen 1556 1557 [formatters] 1558 keys= 1559 1560 [logger_root] 1561 level=DEBUG 1562 handlers=screen 1563 1564 [handler_screen] 1565 level=DEBUG 1566 class=StreamHandler 1567 args=(sys.stdout,) 1568 formatter= 1569 """ 1570 1571 def apply_config(self, conf, **kwargs): 1572 file = io.StringIO(textwrap.dedent(conf)) 1573 logging.config.fileConfig(file, encoding="utf-8", **kwargs) 1574 1575 def test_config0_ok(self): 1576 # A simple config file which overrides the default settings. 1577 with support.captured_stdout() as output: 1578 self.apply_config(self.config0) 1579 logger = logging.getLogger() 1580 # Won't output anything 1581 logger.info(self.next_message()) 1582 # Outputs a message 1583 logger.error(self.next_message()) 1584 self.assert_log_lines([ 1585 ('ERROR', '2'), 1586 ], stream=output) 1587 # Original logger output is empty. 1588 self.assert_log_lines([]) 1589 1590 def test_config0_using_cp_ok(self): 1591 # A simple config file which overrides the default settings. 1592 with support.captured_stdout() as output: 1593 file = io.StringIO(textwrap.dedent(self.config0)) 1594 cp = configparser.ConfigParser() 1595 cp.read_file(file) 1596 logging.config.fileConfig(cp) 1597 logger = logging.getLogger() 1598 # Won't output anything 1599 logger.info(self.next_message()) 1600 # Outputs a message 1601 logger.error(self.next_message()) 1602 self.assert_log_lines([ 1603 ('ERROR', '2'), 1604 ], stream=output) 1605 # Original logger output is empty. 1606 self.assert_log_lines([]) 1607 1608 def test_config1_ok(self, config=config1): 1609 # A config file defining a sub-parser as well. 1610 with support.captured_stdout() as output: 1611 self.apply_config(config) 1612 logger = logging.getLogger("compiler.parser") 1613 # Both will output a message 1614 logger.info(self.next_message()) 1615 logger.error(self.next_message()) 1616 self.assert_log_lines([ 1617 ('INFO', '1'), 1618 ('ERROR', '2'), 1619 ], stream=output) 1620 # Original logger output is empty. 1621 self.assert_log_lines([]) 1622 1623 def test_config2_failure(self): 1624 # A simple config file which overrides the default settings. 1625 self.assertRaises(Exception, self.apply_config, self.config2) 1626 1627 def test_config3_failure(self): 1628 # A simple config file which overrides the default settings. 1629 self.assertRaises(Exception, self.apply_config, self.config3) 1630 1631 def test_config4_ok(self): 1632 # A config file specifying a custom formatter class. 1633 with support.captured_stdout() as output: 1634 self.apply_config(self.config4) 1635 logger = logging.getLogger() 1636 try: 1637 raise RuntimeError() 1638 except RuntimeError: 1639 logging.exception("just testing") 1640 sys.stdout.seek(0) 1641 self.assertEqual(output.getvalue(), 1642 "ERROR:root:just testing\nGot a [RuntimeError]\n") 1643 # Original logger output is empty 1644 self.assert_log_lines([]) 1645 1646 def test_config5_ok(self): 1647 self.test_config1_ok(config=self.config5) 1648 1649 def test_config6_ok(self): 1650 self.test_config1_ok(config=self.config6) 1651 1652 def test_config7_ok(self): 1653 with support.captured_stdout() as output: 1654 self.apply_config(self.config1a) 1655 logger = logging.getLogger("compiler.parser") 1656 # See issue #11424. compiler-hyphenated sorts 1657 # between compiler and compiler.xyz and this 1658 # was preventing compiler.xyz from being included 1659 # in the child loggers of compiler because of an 1660 # overzealous loop termination condition. 1661 hyphenated = logging.getLogger('compiler-hyphenated') 1662 # All will output a message 1663 logger.info(self.next_message()) 1664 logger.error(self.next_message()) 1665 hyphenated.critical(self.next_message()) 1666 self.assert_log_lines([ 1667 ('INFO', '1'), 1668 ('ERROR', '2'), 1669 ('CRITICAL', '3'), 1670 ], stream=output) 1671 # Original logger output is empty. 1672 self.assert_log_lines([]) 1673 with support.captured_stdout() as output: 1674 self.apply_config(self.config7) 1675 logger = logging.getLogger("compiler.parser") 1676 self.assertFalse(logger.disabled) 1677 # Both will output a message 1678 logger.info(self.next_message()) 1679 logger.error(self.next_message()) 1680 logger = logging.getLogger("compiler.lexer") 1681 # Both will output a message 1682 logger.info(self.next_message()) 1683 logger.error(self.next_message()) 1684 # Will not appear 1685 hyphenated.critical(self.next_message()) 1686 self.assert_log_lines([ 1687 ('INFO', '4'), 1688 ('ERROR', '5'), 1689 ('INFO', '6'), 1690 ('ERROR', '7'), 1691 ], stream=output) 1692 # Original logger output is empty. 1693 self.assert_log_lines([]) 1694 1695 def test_config8_ok(self): 1696 1697 with self.check_no_resource_warning(): 1698 fn = make_temp_file(".log", "test_logging-X-") 1699 1700 # Replace single backslash with double backslash in windows 1701 # to avoid unicode error during string formatting 1702 if os.name == "nt": 1703 fn = fn.replace("\\", "\\\\") 1704 1705 config8 = self.config8.format(tempfile=fn) 1706 1707 self.apply_config(config8) 1708 self.apply_config(config8) 1709 1710 handler = logging.root.handlers[0] 1711 self.addCleanup(closeFileHandler, handler, fn) 1712 1713 def test_config9_ok(self): 1714 self.apply_config(self.config9) 1715 formatter = logging.root.handlers[0].formatter 1716 result = formatter.format(logging.makeLogRecord({'msg': 'test'})) 1717 self.assertEqual(result, 'test ++ defaultvalue') 1718 result = formatter.format(logging.makeLogRecord( 1719 {'msg': 'test', 'customfield': "customvalue"})) 1720 self.assertEqual(result, 'test ++ customvalue') 1721 1722 1723 def test_logger_disabling(self): 1724 self.apply_config(self.disable_test) 1725 logger = logging.getLogger('some_pristine_logger') 1726 self.assertFalse(logger.disabled) 1727 self.apply_config(self.disable_test) 1728 self.assertTrue(logger.disabled) 1729 self.apply_config(self.disable_test, disable_existing_loggers=False) 1730 self.assertFalse(logger.disabled) 1731 1732 def test_config_set_handler_names(self): 1733 test_config = """ 1734 [loggers] 1735 keys=root 1736 1737 [handlers] 1738 keys=hand1 1739 1740 [formatters] 1741 keys=form1 1742 1743 [logger_root] 1744 handlers=hand1 1745 1746 [handler_hand1] 1747 class=StreamHandler 1748 formatter=form1 1749 1750 [formatter_form1] 1751 format=%(levelname)s ++ %(message)s 1752 """ 1753 self.apply_config(test_config) 1754 self.assertEqual(logging.getLogger().handlers[0].name, 'hand1') 1755 1756 def test_exception_if_confg_file_is_invalid(self): 1757 test_config = """ 1758 [loggers] 1759 keys=root 1760 1761 [handlers] 1762 keys=hand1 1763 1764 [formatters] 1765 keys=form1 1766 1767 [logger_root] 1768 handlers=hand1 1769 1770 [handler_hand1] 1771 class=StreamHandler 1772 formatter=form1 1773 1774 [formatter_form1] 1775 format=%(levelname)s ++ %(message)s 1776 1777 prince 1778 """ 1779 1780 file = io.StringIO(textwrap.dedent(test_config)) 1781 self.assertRaises(RuntimeError, logging.config.fileConfig, file) 1782 1783 def test_exception_if_confg_file_is_empty(self): 1784 fd, fn = tempfile.mkstemp(prefix='test_empty_', suffix='.ini') 1785 os.close(fd) 1786 self.assertRaises(RuntimeError, logging.config.fileConfig, fn) 1787 os.remove(fn) 1788 1789 def test_exception_if_config_file_does_not_exist(self): 1790 self.assertRaises(FileNotFoundError, logging.config.fileConfig, 'filenotfound') 1791 1792 def test_defaults_do_no_interpolation(self): 1793 """bpo-33802 defaults should not get interpolated""" 1794 ini = textwrap.dedent(""" 1795 [formatters] 1796 keys=default 1797 1798 [formatter_default] 1799 1800 [handlers] 1801 keys=console 1802 1803 [handler_console] 1804 class=logging.StreamHandler 1805 args=tuple() 1806 1807 [loggers] 1808 keys=root 1809 1810 [logger_root] 1811 formatter=default 1812 handlers=console 1813 """).strip() 1814 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini') 1815 try: 1816 os.write(fd, ini.encode('ascii')) 1817 os.close(fd) 1818 logging.config.fileConfig( 1819 fn, 1820 encoding="utf-8", 1821 defaults=dict( 1822 version=1, 1823 disable_existing_loggers=False, 1824 formatters={ 1825 "generic": { 1826 "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s", 1827 "datefmt": "[%Y-%m-%d %H:%M:%S %z]", 1828 "class": "logging.Formatter" 1829 }, 1830 }, 1831 ) 1832 ) 1833 finally: 1834 os.unlink(fn) 1835 1836 1837@support.requires_working_socket() 1838@threading_helper.requires_working_threading() 1839class SocketHandlerTest(BaseTest): 1840 1841 """Test for SocketHandler objects.""" 1842 1843 server_class = TestTCPServer 1844 address = ('localhost', 0) 1845 1846 def setUp(self): 1847 """Set up a TCP server to receive log messages, and a SocketHandler 1848 pointing to that server's address and port.""" 1849 BaseTest.setUp(self) 1850 # Issue #29177: deal with errors that happen during setup 1851 self.server = self.sock_hdlr = self.server_exception = None 1852 try: 1853 self.server = server = self.server_class(self.address, 1854 self.handle_socket, 0.01) 1855 server.start() 1856 # Uncomment next line to test error recovery in setUp() 1857 # raise OSError('dummy error raised') 1858 except OSError as e: 1859 self.server_exception = e 1860 return 1861 server.ready.wait() 1862 hcls = logging.handlers.SocketHandler 1863 if isinstance(server.server_address, tuple): 1864 self.sock_hdlr = hcls('localhost', server.port) 1865 else: 1866 self.sock_hdlr = hcls(server.server_address, None) 1867 self.log_output = '' 1868 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1869 self.root_logger.addHandler(self.sock_hdlr) 1870 self.handled = threading.Semaphore(0) 1871 1872 def tearDown(self): 1873 """Shutdown the TCP server.""" 1874 try: 1875 if self.sock_hdlr: 1876 self.root_logger.removeHandler(self.sock_hdlr) 1877 self.sock_hdlr.close() 1878 if self.server: 1879 self.server.stop() 1880 finally: 1881 BaseTest.tearDown(self) 1882 1883 def handle_socket(self, request): 1884 conn = request.connection 1885 while True: 1886 chunk = conn.recv(4) 1887 if len(chunk) < 4: 1888 break 1889 slen = struct.unpack(">L", chunk)[0] 1890 chunk = conn.recv(slen) 1891 while len(chunk) < slen: 1892 chunk = chunk + conn.recv(slen - len(chunk)) 1893 obj = pickle.loads(chunk) 1894 record = logging.makeLogRecord(obj) 1895 self.log_output += record.msg + '\n' 1896 self.handled.release() 1897 1898 def test_output(self): 1899 # The log message sent to the SocketHandler is properly received. 1900 if self.server_exception: 1901 self.skipTest(self.server_exception) 1902 logger = logging.getLogger("tcp") 1903 logger.error("spam") 1904 self.handled.acquire() 1905 logger.debug("eggs") 1906 self.handled.acquire() 1907 self.assertEqual(self.log_output, "spam\neggs\n") 1908 1909 def test_noserver(self): 1910 if self.server_exception: 1911 self.skipTest(self.server_exception) 1912 # Avoid timing-related failures due to SocketHandler's own hard-wired 1913 # one-second timeout on socket.create_connection() (issue #16264). 1914 self.sock_hdlr.retryStart = 2.5 1915 # Kill the server 1916 self.server.stop() 1917 # The logging call should try to connect, which should fail 1918 try: 1919 raise RuntimeError('Deliberate mistake') 1920 except RuntimeError: 1921 self.root_logger.exception('Never sent') 1922 self.root_logger.error('Never sent, either') 1923 now = time.time() 1924 self.assertGreater(self.sock_hdlr.retryTime, now) 1925 time.sleep(self.sock_hdlr.retryTime - now + 0.001) 1926 self.root_logger.error('Nor this') 1927 1928 1929@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1930class UnixSocketHandlerTest(SocketHandlerTest): 1931 1932 """Test for SocketHandler with unix sockets.""" 1933 1934 if hasattr(socket, "AF_UNIX"): 1935 server_class = TestUnixStreamServer 1936 1937 def setUp(self): 1938 # override the definition in the base class 1939 self.address = socket_helper.create_unix_domain_name() 1940 self.addCleanup(os_helper.unlink, self.address) 1941 SocketHandlerTest.setUp(self) 1942 1943@support.requires_working_socket() 1944@threading_helper.requires_working_threading() 1945class DatagramHandlerTest(BaseTest): 1946 1947 """Test for DatagramHandler.""" 1948 1949 server_class = TestUDPServer 1950 address = ('localhost', 0) 1951 1952 def setUp(self): 1953 """Set up a UDP server to receive log messages, and a DatagramHandler 1954 pointing to that server's address and port.""" 1955 BaseTest.setUp(self) 1956 # Issue #29177: deal with errors that happen during setup 1957 self.server = self.sock_hdlr = self.server_exception = None 1958 try: 1959 self.server = server = self.server_class(self.address, 1960 self.handle_datagram, 0.01) 1961 server.start() 1962 # Uncomment next line to test error recovery in setUp() 1963 # raise OSError('dummy error raised') 1964 except OSError as e: 1965 self.server_exception = e 1966 return 1967 server.ready.wait() 1968 hcls = logging.handlers.DatagramHandler 1969 if isinstance(server.server_address, tuple): 1970 self.sock_hdlr = hcls('localhost', server.port) 1971 else: 1972 self.sock_hdlr = hcls(server.server_address, None) 1973 self.log_output = '' 1974 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1975 self.root_logger.addHandler(self.sock_hdlr) 1976 self.handled = threading.Event() 1977 1978 def tearDown(self): 1979 """Shutdown the UDP server.""" 1980 try: 1981 if self.server: 1982 self.server.stop() 1983 if self.sock_hdlr: 1984 self.root_logger.removeHandler(self.sock_hdlr) 1985 self.sock_hdlr.close() 1986 finally: 1987 BaseTest.tearDown(self) 1988 1989 def handle_datagram(self, request): 1990 slen = struct.pack('>L', 0) # length of prefix 1991 packet = request.packet[len(slen):] 1992 obj = pickle.loads(packet) 1993 record = logging.makeLogRecord(obj) 1994 self.log_output += record.msg + '\n' 1995 self.handled.set() 1996 1997 def test_output(self): 1998 # The log message sent to the DatagramHandler is properly received. 1999 if self.server_exception: 2000 self.skipTest(self.server_exception) 2001 logger = logging.getLogger("udp") 2002 logger.error("spam") 2003 self.handled.wait() 2004 self.handled.clear() 2005 logger.error("eggs") 2006 self.handled.wait() 2007 self.assertEqual(self.log_output, "spam\neggs\n") 2008 2009@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 2010class UnixDatagramHandlerTest(DatagramHandlerTest): 2011 2012 """Test for DatagramHandler using Unix sockets.""" 2013 2014 if hasattr(socket, "AF_UNIX"): 2015 server_class = TestUnixDatagramServer 2016 2017 def setUp(self): 2018 # override the definition in the base class 2019 self.address = socket_helper.create_unix_domain_name() 2020 self.addCleanup(os_helper.unlink, self.address) 2021 DatagramHandlerTest.setUp(self) 2022 2023@support.requires_working_socket() 2024@threading_helper.requires_working_threading() 2025class SysLogHandlerTest(BaseTest): 2026 2027 """Test for SysLogHandler using UDP.""" 2028 2029 server_class = TestUDPServer 2030 address = ('localhost', 0) 2031 2032 def setUp(self): 2033 """Set up a UDP server to receive log messages, and a SysLogHandler 2034 pointing to that server's address and port.""" 2035 BaseTest.setUp(self) 2036 # Issue #29177: deal with errors that happen during setup 2037 self.server = self.sl_hdlr = self.server_exception = None 2038 try: 2039 self.server = server = self.server_class(self.address, 2040 self.handle_datagram, 0.01) 2041 server.start() 2042 # Uncomment next line to test error recovery in setUp() 2043 # raise OSError('dummy error raised') 2044 except OSError as e: 2045 self.server_exception = e 2046 return 2047 server.ready.wait() 2048 hcls = logging.handlers.SysLogHandler 2049 if isinstance(server.server_address, tuple): 2050 self.sl_hdlr = hcls((server.server_address[0], server.port)) 2051 else: 2052 self.sl_hdlr = hcls(server.server_address) 2053 self.log_output = b'' 2054 self.root_logger.removeHandler(self.root_logger.handlers[0]) 2055 self.root_logger.addHandler(self.sl_hdlr) 2056 self.handled = threading.Event() 2057 2058 def tearDown(self): 2059 """Shutdown the server.""" 2060 try: 2061 if self.server: 2062 self.server.stop() 2063 if self.sl_hdlr: 2064 self.root_logger.removeHandler(self.sl_hdlr) 2065 self.sl_hdlr.close() 2066 finally: 2067 BaseTest.tearDown(self) 2068 2069 def handle_datagram(self, request): 2070 self.log_output = request.packet 2071 self.handled.set() 2072 2073 def test_output(self): 2074 if self.server_exception: 2075 self.skipTest(self.server_exception) 2076 # The log message sent to the SysLogHandler is properly received. 2077 logger = logging.getLogger("slh") 2078 logger.error("sp\xe4m") 2079 self.handled.wait(support.LONG_TIMEOUT) 2080 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 2081 self.handled.clear() 2082 self.sl_hdlr.append_nul = False 2083 logger.error("sp\xe4m") 2084 self.handled.wait(support.LONG_TIMEOUT) 2085 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m') 2086 self.handled.clear() 2087 self.sl_hdlr.ident = "h\xe4m-" 2088 logger.error("sp\xe4m") 2089 self.handled.wait(support.LONG_TIMEOUT) 2090 self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') 2091 2092 def test_udp_reconnection(self): 2093 logger = logging.getLogger("slh") 2094 self.sl_hdlr.close() 2095 self.handled.clear() 2096 logger.error("sp\xe4m") 2097 self.handled.wait(support.LONG_TIMEOUT) 2098 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 2099 2100@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 2101class UnixSysLogHandlerTest(SysLogHandlerTest): 2102 2103 """Test for SysLogHandler with Unix sockets.""" 2104 2105 if hasattr(socket, "AF_UNIX"): 2106 server_class = TestUnixDatagramServer 2107 2108 def setUp(self): 2109 # override the definition in the base class 2110 self.address = socket_helper.create_unix_domain_name() 2111 self.addCleanup(os_helper.unlink, self.address) 2112 SysLogHandlerTest.setUp(self) 2113 2114@unittest.skipUnless(socket_helper.IPV6_ENABLED, 2115 'IPv6 support required for this test.') 2116class IPv6SysLogHandlerTest(SysLogHandlerTest): 2117 2118 """Test for SysLogHandler with IPv6 host.""" 2119 2120 server_class = TestUDPServer 2121 address = ('::1', 0) 2122 2123 def setUp(self): 2124 self.server_class.address_family = socket.AF_INET6 2125 super(IPv6SysLogHandlerTest, self).setUp() 2126 2127 def tearDown(self): 2128 self.server_class.address_family = socket.AF_INET 2129 super(IPv6SysLogHandlerTest, self).tearDown() 2130 2131@support.requires_working_socket() 2132@threading_helper.requires_working_threading() 2133class HTTPHandlerTest(BaseTest): 2134 """Test for HTTPHandler.""" 2135 2136 def setUp(self): 2137 """Set up an HTTP server to receive log messages, and a HTTPHandler 2138 pointing to that server's address and port.""" 2139 BaseTest.setUp(self) 2140 self.handled = threading.Event() 2141 2142 def handle_request(self, request): 2143 self.command = request.command 2144 self.log_data = urlparse(request.path) 2145 if self.command == 'POST': 2146 try: 2147 rlen = int(request.headers['Content-Length']) 2148 self.post_data = request.rfile.read(rlen) 2149 except: 2150 self.post_data = None 2151 request.send_response(200) 2152 request.end_headers() 2153 self.handled.set() 2154 2155 def test_output(self): 2156 # The log message sent to the HTTPHandler is properly received. 2157 logger = logging.getLogger("http") 2158 root_logger = self.root_logger 2159 root_logger.removeHandler(self.root_logger.handlers[0]) 2160 for secure in (False, True): 2161 addr = ('localhost', 0) 2162 if secure: 2163 try: 2164 import ssl 2165 except ImportError: 2166 sslctx = None 2167 else: 2168 here = os.path.dirname(__file__) 2169 localhost_cert = os.path.join(here, "certdata", "keycert.pem") 2170 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 2171 sslctx.load_cert_chain(localhost_cert) 2172 2173 context = ssl.create_default_context(cafile=localhost_cert) 2174 else: 2175 sslctx = None 2176 context = None 2177 self.server = server = TestHTTPServer(addr, self.handle_request, 2178 0.01, sslctx=sslctx) 2179 server.start() 2180 server.ready.wait() 2181 host = 'localhost:%d' % server.server_port 2182 secure_client = secure and sslctx 2183 self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', 2184 secure=secure_client, 2185 context=context, 2186 credentials=('foo', 'bar')) 2187 self.log_data = None 2188 root_logger.addHandler(self.h_hdlr) 2189 2190 for method in ('GET', 'POST'): 2191 self.h_hdlr.method = method 2192 self.handled.clear() 2193 msg = "sp\xe4m" 2194 logger.error(msg) 2195 handled = self.handled.wait(support.SHORT_TIMEOUT) 2196 self.assertTrue(handled, "HTTP request timed out") 2197 self.assertEqual(self.log_data.path, '/frob') 2198 self.assertEqual(self.command, method) 2199 if method == 'GET': 2200 d = parse_qs(self.log_data.query) 2201 else: 2202 d = parse_qs(self.post_data.decode('utf-8')) 2203 self.assertEqual(d['name'], ['http']) 2204 self.assertEqual(d['funcName'], ['test_output']) 2205 self.assertEqual(d['msg'], [msg]) 2206 2207 self.server.stop() 2208 self.root_logger.removeHandler(self.h_hdlr) 2209 self.h_hdlr.close() 2210 2211class MemoryTest(BaseTest): 2212 2213 """Test memory persistence of logger objects.""" 2214 2215 def setUp(self): 2216 """Create a dict to remember potentially destroyed objects.""" 2217 BaseTest.setUp(self) 2218 self._survivors = {} 2219 2220 def _watch_for_survival(self, *args): 2221 """Watch the given objects for survival, by creating weakrefs to 2222 them.""" 2223 for obj in args: 2224 key = id(obj), repr(obj) 2225 self._survivors[key] = weakref.ref(obj) 2226 2227 def _assertTruesurvival(self): 2228 """Assert that all objects watched for survival have survived.""" 2229 # Trigger cycle breaking. 2230 gc.collect() 2231 dead = [] 2232 for (id_, repr_), ref in self._survivors.items(): 2233 if ref() is None: 2234 dead.append(repr_) 2235 if dead: 2236 self.fail("%d objects should have survived " 2237 "but have been destroyed: %s" % (len(dead), ", ".join(dead))) 2238 2239 def test_persistent_loggers(self): 2240 # Logger objects are persistent and retain their configuration, even 2241 # if visible references are destroyed. 2242 self.root_logger.setLevel(logging.INFO) 2243 foo = logging.getLogger("foo") 2244 self._watch_for_survival(foo) 2245 foo.setLevel(logging.DEBUG) 2246 self.root_logger.debug(self.next_message()) 2247 foo.debug(self.next_message()) 2248 self.assert_log_lines([ 2249 ('foo', 'DEBUG', '2'), 2250 ]) 2251 del foo 2252 # foo has survived. 2253 self._assertTruesurvival() 2254 # foo has retained its settings. 2255 bar = logging.getLogger("foo") 2256 bar.debug(self.next_message()) 2257 self.assert_log_lines([ 2258 ('foo', 'DEBUG', '2'), 2259 ('foo', 'DEBUG', '3'), 2260 ]) 2261 2262 2263class EncodingTest(BaseTest): 2264 def test_encoding_plain_file(self): 2265 # In Python 2.x, a plain file object is treated as having no encoding. 2266 log = logging.getLogger("test") 2267 fn = make_temp_file(".log", "test_logging-1-") 2268 # the non-ascii data we write to the log. 2269 data = "foo\x80" 2270 try: 2271 handler = logging.FileHandler(fn, encoding="utf-8") 2272 log.addHandler(handler) 2273 try: 2274 # write non-ascii data to the log. 2275 log.warning(data) 2276 finally: 2277 log.removeHandler(handler) 2278 handler.close() 2279 # check we wrote exactly those bytes, ignoring trailing \n etc 2280 f = open(fn, encoding="utf-8") 2281 try: 2282 self.assertEqual(f.read().rstrip(), data) 2283 finally: 2284 f.close() 2285 finally: 2286 if os.path.isfile(fn): 2287 os.remove(fn) 2288 2289 def test_encoding_cyrillic_unicode(self): 2290 log = logging.getLogger("test") 2291 # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) 2292 message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' 2293 # Ensure it's written in a Cyrillic encoding 2294 writer_class = codecs.getwriter('cp1251') 2295 writer_class.encoding = 'cp1251' 2296 stream = io.BytesIO() 2297 writer = writer_class(stream, 'strict') 2298 handler = logging.StreamHandler(writer) 2299 log.addHandler(handler) 2300 try: 2301 log.warning(message) 2302 finally: 2303 log.removeHandler(handler) 2304 handler.close() 2305 # check we wrote exactly those bytes, ignoring trailing \n etc 2306 s = stream.getvalue() 2307 # Compare against what the data should be when encoded in CP-1251 2308 self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') 2309 2310 2311class WarningsTest(BaseTest): 2312 2313 def test_warnings(self): 2314 with warnings.catch_warnings(): 2315 logging.captureWarnings(True) 2316 self.addCleanup(logging.captureWarnings, False) 2317 warnings.filterwarnings("always", category=UserWarning) 2318 stream = io.StringIO() 2319 h = logging.StreamHandler(stream) 2320 logger = logging.getLogger("py.warnings") 2321 logger.addHandler(h) 2322 warnings.warn("I'm warning you...") 2323 logger.removeHandler(h) 2324 s = stream.getvalue() 2325 h.close() 2326 self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) 2327 2328 # See if an explicit file uses the original implementation 2329 a_file = io.StringIO() 2330 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, 2331 a_file, "Dummy line") 2332 s = a_file.getvalue() 2333 a_file.close() 2334 self.assertEqual(s, 2335 "dummy.py:42: UserWarning: Explicit\n Dummy line\n") 2336 2337 def test_warnings_no_handlers(self): 2338 with warnings.catch_warnings(): 2339 logging.captureWarnings(True) 2340 self.addCleanup(logging.captureWarnings, False) 2341 2342 # confirm our assumption: no loggers are set 2343 logger = logging.getLogger("py.warnings") 2344 self.assertEqual(logger.handlers, []) 2345 2346 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42) 2347 self.assertEqual(len(logger.handlers), 1) 2348 self.assertIsInstance(logger.handlers[0], logging.NullHandler) 2349 2350 2351def formatFunc(format, datefmt=None): 2352 return logging.Formatter(format, datefmt) 2353 2354class myCustomFormatter: 2355 def __init__(self, fmt, datefmt=None): 2356 pass 2357 2358def handlerFunc(): 2359 return logging.StreamHandler() 2360 2361class CustomHandler(logging.StreamHandler): 2362 pass 2363 2364class CustomListener(logging.handlers.QueueListener): 2365 pass 2366 2367class CustomQueue(queue.Queue): 2368 pass 2369 2370class CustomQueueProtocol: 2371 def __init__(self, maxsize=0): 2372 self.queue = queue.Queue(maxsize) 2373 2374 def __getattr__(self, attribute): 2375 queue = object.__getattribute__(self, 'queue') 2376 return getattr(queue, attribute) 2377 2378class CustomQueueFakeProtocol(CustomQueueProtocol): 2379 # An object implementing the minimial Queue API for 2380 # the logging module but with incorrect signatures. 2381 # 2382 # The object will be considered a valid queue class since we 2383 # do not check the signatures (only callability of methods) 2384 # but will NOT be usable in production since a TypeError will 2385 # be raised due to the extra argument in 'put_nowait'. 2386 def put_nowait(self): 2387 pass 2388 2389class CustomQueueWrongProtocol(CustomQueueProtocol): 2390 put_nowait = None 2391 2392class MinimalQueueProtocol: 2393 def put_nowait(self, x): pass 2394 def get(self): pass 2395 2396def queueMaker(): 2397 return queue.Queue() 2398 2399def listenerMaker(arg1, arg2, respect_handler_level=False): 2400 def func(queue, *handlers, **kwargs): 2401 kwargs.setdefault('respect_handler_level', respect_handler_level) 2402 return CustomListener(queue, *handlers, **kwargs) 2403 return func 2404 2405class ConfigDictTest(BaseTest): 2406 2407 """Reading logging config from a dictionary.""" 2408 2409 check_no_resource_warning = warnings_helper.check_no_resource_warning 2410 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 2411 2412 # config0 is a standard configuration. 2413 config0 = { 2414 'version': 1, 2415 'formatters': { 2416 'form1' : { 2417 'format' : '%(levelname)s ++ %(message)s', 2418 }, 2419 }, 2420 'handlers' : { 2421 'hand1' : { 2422 'class' : 'logging.StreamHandler', 2423 'formatter' : 'form1', 2424 'level' : 'NOTSET', 2425 'stream' : 'ext://sys.stdout', 2426 }, 2427 }, 2428 'root' : { 2429 'level' : 'WARNING', 2430 'handlers' : ['hand1'], 2431 }, 2432 } 2433 2434 # config1 adds a little to the standard configuration. 2435 config1 = { 2436 'version': 1, 2437 'formatters': { 2438 'form1' : { 2439 'format' : '%(levelname)s ++ %(message)s', 2440 }, 2441 }, 2442 'handlers' : { 2443 'hand1' : { 2444 'class' : 'logging.StreamHandler', 2445 'formatter' : 'form1', 2446 'level' : 'NOTSET', 2447 'stream' : 'ext://sys.stdout', 2448 }, 2449 }, 2450 'loggers' : { 2451 'compiler.parser' : { 2452 'level' : 'DEBUG', 2453 'handlers' : ['hand1'], 2454 }, 2455 }, 2456 'root' : { 2457 'level' : 'WARNING', 2458 }, 2459 } 2460 2461 # config1a moves the handler to the root. Used with config8a 2462 config1a = { 2463 'version': 1, 2464 'formatters': { 2465 'form1' : { 2466 'format' : '%(levelname)s ++ %(message)s', 2467 }, 2468 }, 2469 'handlers' : { 2470 'hand1' : { 2471 'class' : 'logging.StreamHandler', 2472 'formatter' : 'form1', 2473 'level' : 'NOTSET', 2474 'stream' : 'ext://sys.stdout', 2475 }, 2476 }, 2477 'loggers' : { 2478 'compiler.parser' : { 2479 'level' : 'DEBUG', 2480 }, 2481 }, 2482 'root' : { 2483 'level' : 'WARNING', 2484 'handlers' : ['hand1'], 2485 }, 2486 } 2487 2488 # config2 has a subtle configuration error that should be reported 2489 config2 = { 2490 'version': 1, 2491 'formatters': { 2492 'form1' : { 2493 'format' : '%(levelname)s ++ %(message)s', 2494 }, 2495 }, 2496 'handlers' : { 2497 'hand1' : { 2498 'class' : 'logging.StreamHandler', 2499 'formatter' : 'form1', 2500 'level' : 'NOTSET', 2501 'stream' : 'ext://sys.stdbout', 2502 }, 2503 }, 2504 'loggers' : { 2505 'compiler.parser' : { 2506 'level' : 'DEBUG', 2507 'handlers' : ['hand1'], 2508 }, 2509 }, 2510 'root' : { 2511 'level' : 'WARNING', 2512 }, 2513 } 2514 2515 # As config1 but with a misspelt level on a handler 2516 config2a = { 2517 'version': 1, 2518 'formatters': { 2519 'form1' : { 2520 'format' : '%(levelname)s ++ %(message)s', 2521 }, 2522 }, 2523 'handlers' : { 2524 'hand1' : { 2525 'class' : 'logging.StreamHandler', 2526 'formatter' : 'form1', 2527 'level' : 'NTOSET', 2528 'stream' : 'ext://sys.stdout', 2529 }, 2530 }, 2531 'loggers' : { 2532 'compiler.parser' : { 2533 'level' : 'DEBUG', 2534 'handlers' : ['hand1'], 2535 }, 2536 }, 2537 'root' : { 2538 'level' : 'WARNING', 2539 }, 2540 } 2541 2542 2543 # As config1 but with a misspelt level on a logger 2544 config2b = { 2545 'version': 1, 2546 'formatters': { 2547 'form1' : { 2548 'format' : '%(levelname)s ++ %(message)s', 2549 }, 2550 }, 2551 'handlers' : { 2552 'hand1' : { 2553 'class' : 'logging.StreamHandler', 2554 'formatter' : 'form1', 2555 'level' : 'NOTSET', 2556 'stream' : 'ext://sys.stdout', 2557 }, 2558 }, 2559 'loggers' : { 2560 'compiler.parser' : { 2561 'level' : 'DEBUG', 2562 'handlers' : ['hand1'], 2563 }, 2564 }, 2565 'root' : { 2566 'level' : 'WRANING', 2567 }, 2568 } 2569 2570 # config3 has a less subtle configuration error 2571 config3 = { 2572 'version': 1, 2573 'formatters': { 2574 'form1' : { 2575 'format' : '%(levelname)s ++ %(message)s', 2576 }, 2577 }, 2578 'handlers' : { 2579 'hand1' : { 2580 'class' : 'logging.StreamHandler', 2581 'formatter' : 'misspelled_name', 2582 'level' : 'NOTSET', 2583 'stream' : 'ext://sys.stdout', 2584 }, 2585 }, 2586 'loggers' : { 2587 'compiler.parser' : { 2588 'level' : 'DEBUG', 2589 'handlers' : ['hand1'], 2590 }, 2591 }, 2592 'root' : { 2593 'level' : 'WARNING', 2594 }, 2595 } 2596 2597 # config4 specifies a custom formatter class to be loaded 2598 config4 = { 2599 'version': 1, 2600 'formatters': { 2601 'form1' : { 2602 '()' : __name__ + '.ExceptionFormatter', 2603 'format' : '%(levelname)s:%(name)s:%(message)s', 2604 }, 2605 }, 2606 'handlers' : { 2607 'hand1' : { 2608 'class' : 'logging.StreamHandler', 2609 'formatter' : 'form1', 2610 'level' : 'NOTSET', 2611 'stream' : 'ext://sys.stdout', 2612 }, 2613 }, 2614 'root' : { 2615 'level' : 'NOTSET', 2616 'handlers' : ['hand1'], 2617 }, 2618 } 2619 2620 # As config4 but using an actual callable rather than a string 2621 config4a = { 2622 'version': 1, 2623 'formatters': { 2624 'form1' : { 2625 '()' : ExceptionFormatter, 2626 'format' : '%(levelname)s:%(name)s:%(message)s', 2627 }, 2628 'form2' : { 2629 '()' : __name__ + '.formatFunc', 2630 'format' : '%(levelname)s:%(name)s:%(message)s', 2631 }, 2632 'form3' : { 2633 '()' : formatFunc, 2634 'format' : '%(levelname)s:%(name)s:%(message)s', 2635 }, 2636 }, 2637 'handlers' : { 2638 'hand1' : { 2639 'class' : 'logging.StreamHandler', 2640 'formatter' : 'form1', 2641 'level' : 'NOTSET', 2642 'stream' : 'ext://sys.stdout', 2643 }, 2644 'hand2' : { 2645 '()' : handlerFunc, 2646 }, 2647 }, 2648 'root' : { 2649 'level' : 'NOTSET', 2650 'handlers' : ['hand1'], 2651 }, 2652 } 2653 2654 # config5 specifies a custom handler class to be loaded 2655 config5 = { 2656 'version': 1, 2657 'formatters': { 2658 'form1' : { 2659 'format' : '%(levelname)s ++ %(message)s', 2660 }, 2661 }, 2662 'handlers' : { 2663 'hand1' : { 2664 'class' : __name__ + '.CustomHandler', 2665 'formatter' : 'form1', 2666 'level' : 'NOTSET', 2667 'stream' : 'ext://sys.stdout', 2668 }, 2669 }, 2670 'loggers' : { 2671 'compiler.parser' : { 2672 'level' : 'DEBUG', 2673 'handlers' : ['hand1'], 2674 }, 2675 }, 2676 'root' : { 2677 'level' : 'WARNING', 2678 }, 2679 } 2680 2681 # config6 specifies a custom handler class to be loaded 2682 # but has bad arguments 2683 config6 = { 2684 'version': 1, 2685 'formatters': { 2686 'form1' : { 2687 'format' : '%(levelname)s ++ %(message)s', 2688 }, 2689 }, 2690 'handlers' : { 2691 'hand1' : { 2692 'class' : __name__ + '.CustomHandler', 2693 'formatter' : 'form1', 2694 'level' : 'NOTSET', 2695 'stream' : 'ext://sys.stdout', 2696 '9' : 'invalid parameter name', 2697 }, 2698 }, 2699 'loggers' : { 2700 'compiler.parser' : { 2701 'level' : 'DEBUG', 2702 'handlers' : ['hand1'], 2703 }, 2704 }, 2705 'root' : { 2706 'level' : 'WARNING', 2707 }, 2708 } 2709 2710 # config 7 does not define compiler.parser but defines compiler.lexer 2711 # so compiler.parser should be disabled after applying it 2712 config7 = { 2713 'version': 1, 2714 'formatters': { 2715 'form1' : { 2716 'format' : '%(levelname)s ++ %(message)s', 2717 }, 2718 }, 2719 'handlers' : { 2720 'hand1' : { 2721 'class' : 'logging.StreamHandler', 2722 'formatter' : 'form1', 2723 'level' : 'NOTSET', 2724 'stream' : 'ext://sys.stdout', 2725 }, 2726 }, 2727 'loggers' : { 2728 'compiler.lexer' : { 2729 'level' : 'DEBUG', 2730 'handlers' : ['hand1'], 2731 }, 2732 }, 2733 'root' : { 2734 'level' : 'WARNING', 2735 }, 2736 } 2737 2738 # config8 defines both compiler and compiler.lexer 2739 # so compiler.parser should not be disabled (since 2740 # compiler is defined) 2741 config8 = { 2742 'version': 1, 2743 'disable_existing_loggers' : False, 2744 'formatters': { 2745 'form1' : { 2746 'format' : '%(levelname)s ++ %(message)s', 2747 }, 2748 }, 2749 'handlers' : { 2750 'hand1' : { 2751 'class' : 'logging.StreamHandler', 2752 'formatter' : 'form1', 2753 'level' : 'NOTSET', 2754 'stream' : 'ext://sys.stdout', 2755 }, 2756 }, 2757 'loggers' : { 2758 'compiler' : { 2759 'level' : 'DEBUG', 2760 'handlers' : ['hand1'], 2761 }, 2762 'compiler.lexer' : { 2763 }, 2764 }, 2765 'root' : { 2766 'level' : 'WARNING', 2767 }, 2768 } 2769 2770 # config8a disables existing loggers 2771 config8a = { 2772 'version': 1, 2773 'disable_existing_loggers' : True, 2774 'formatters': { 2775 'form1' : { 2776 'format' : '%(levelname)s ++ %(message)s', 2777 }, 2778 }, 2779 'handlers' : { 2780 'hand1' : { 2781 'class' : 'logging.StreamHandler', 2782 'formatter' : 'form1', 2783 'level' : 'NOTSET', 2784 'stream' : 'ext://sys.stdout', 2785 }, 2786 }, 2787 'loggers' : { 2788 'compiler' : { 2789 'level' : 'DEBUG', 2790 'handlers' : ['hand1'], 2791 }, 2792 'compiler.lexer' : { 2793 }, 2794 }, 2795 'root' : { 2796 'level' : 'WARNING', 2797 }, 2798 } 2799 2800 config9 = { 2801 'version': 1, 2802 'formatters': { 2803 'form1' : { 2804 'format' : '%(levelname)s ++ %(message)s', 2805 }, 2806 }, 2807 'handlers' : { 2808 'hand1' : { 2809 'class' : 'logging.StreamHandler', 2810 'formatter' : 'form1', 2811 'level' : 'WARNING', 2812 'stream' : 'ext://sys.stdout', 2813 }, 2814 }, 2815 'loggers' : { 2816 'compiler.parser' : { 2817 'level' : 'WARNING', 2818 'handlers' : ['hand1'], 2819 }, 2820 }, 2821 'root' : { 2822 'level' : 'NOTSET', 2823 }, 2824 } 2825 2826 config9a = { 2827 'version': 1, 2828 'incremental' : True, 2829 'handlers' : { 2830 'hand1' : { 2831 'level' : 'WARNING', 2832 }, 2833 }, 2834 'loggers' : { 2835 'compiler.parser' : { 2836 'level' : 'INFO', 2837 }, 2838 }, 2839 } 2840 2841 config9b = { 2842 'version': 1, 2843 'incremental' : True, 2844 'handlers' : { 2845 'hand1' : { 2846 'level' : 'INFO', 2847 }, 2848 }, 2849 'loggers' : { 2850 'compiler.parser' : { 2851 'level' : 'INFO', 2852 }, 2853 }, 2854 } 2855 2856 # As config1 but with a filter added 2857 config10 = { 2858 'version': 1, 2859 'formatters': { 2860 'form1' : { 2861 'format' : '%(levelname)s ++ %(message)s', 2862 }, 2863 }, 2864 'filters' : { 2865 'filt1' : { 2866 'name' : 'compiler.parser', 2867 }, 2868 }, 2869 'handlers' : { 2870 'hand1' : { 2871 'class' : 'logging.StreamHandler', 2872 'formatter' : 'form1', 2873 'level' : 'NOTSET', 2874 'stream' : 'ext://sys.stdout', 2875 'filters' : ['filt1'], 2876 }, 2877 }, 2878 'loggers' : { 2879 'compiler.parser' : { 2880 'level' : 'DEBUG', 2881 'filters' : ['filt1'], 2882 }, 2883 }, 2884 'root' : { 2885 'level' : 'WARNING', 2886 'handlers' : ['hand1'], 2887 }, 2888 } 2889 2890 # As config1 but using cfg:// references 2891 config11 = { 2892 'version': 1, 2893 'true_formatters': { 2894 'form1' : { 2895 'format' : '%(levelname)s ++ %(message)s', 2896 }, 2897 }, 2898 'handler_configs': { 2899 'hand1' : { 2900 'class' : 'logging.StreamHandler', 2901 'formatter' : 'form1', 2902 'level' : 'NOTSET', 2903 'stream' : 'ext://sys.stdout', 2904 }, 2905 }, 2906 'formatters' : 'cfg://true_formatters', 2907 'handlers' : { 2908 'hand1' : 'cfg://handler_configs[hand1]', 2909 }, 2910 'loggers' : { 2911 'compiler.parser' : { 2912 'level' : 'DEBUG', 2913 'handlers' : ['hand1'], 2914 }, 2915 }, 2916 'root' : { 2917 'level' : 'WARNING', 2918 }, 2919 } 2920 2921 # As config11 but missing the version key 2922 config12 = { 2923 'true_formatters': { 2924 'form1' : { 2925 'format' : '%(levelname)s ++ %(message)s', 2926 }, 2927 }, 2928 'handler_configs': { 2929 'hand1' : { 2930 'class' : 'logging.StreamHandler', 2931 'formatter' : 'form1', 2932 'level' : 'NOTSET', 2933 'stream' : 'ext://sys.stdout', 2934 }, 2935 }, 2936 'formatters' : 'cfg://true_formatters', 2937 'handlers' : { 2938 'hand1' : 'cfg://handler_configs[hand1]', 2939 }, 2940 'loggers' : { 2941 'compiler.parser' : { 2942 'level' : 'DEBUG', 2943 'handlers' : ['hand1'], 2944 }, 2945 }, 2946 'root' : { 2947 'level' : 'WARNING', 2948 }, 2949 } 2950 2951 # As config11 but using an unsupported version 2952 config13 = { 2953 'version': 2, 2954 'true_formatters': { 2955 'form1' : { 2956 'format' : '%(levelname)s ++ %(message)s', 2957 }, 2958 }, 2959 'handler_configs': { 2960 'hand1' : { 2961 'class' : 'logging.StreamHandler', 2962 'formatter' : 'form1', 2963 'level' : 'NOTSET', 2964 'stream' : 'ext://sys.stdout', 2965 }, 2966 }, 2967 'formatters' : 'cfg://true_formatters', 2968 'handlers' : { 2969 'hand1' : 'cfg://handler_configs[hand1]', 2970 }, 2971 'loggers' : { 2972 'compiler.parser' : { 2973 'level' : 'DEBUG', 2974 'handlers' : ['hand1'], 2975 }, 2976 }, 2977 'root' : { 2978 'level' : 'WARNING', 2979 }, 2980 } 2981 2982 # As config0, but with properties 2983 config14 = { 2984 'version': 1, 2985 'formatters': { 2986 'form1' : { 2987 'format' : '%(levelname)s ++ %(message)s', 2988 }, 2989 }, 2990 'handlers' : { 2991 'hand1' : { 2992 'class' : 'logging.StreamHandler', 2993 'formatter' : 'form1', 2994 'level' : 'NOTSET', 2995 'stream' : 'ext://sys.stdout', 2996 '.': { 2997 'foo': 'bar', 2998 'terminator': '!\n', 2999 } 3000 }, 3001 }, 3002 'root' : { 3003 'level' : 'WARNING', 3004 'handlers' : ['hand1'], 3005 }, 3006 } 3007 3008 # config0 but with default values for formatter. Skipped 15, it is defined 3009 # in the test code. 3010 config16 = { 3011 'version': 1, 3012 'formatters': { 3013 'form1' : { 3014 'format' : '%(message)s ++ %(customfield)s', 3015 'defaults': {"customfield": "defaultvalue"} 3016 }, 3017 }, 3018 'handlers' : { 3019 'hand1' : { 3020 'class' : 'logging.StreamHandler', 3021 'formatter' : 'form1', 3022 'level' : 'NOTSET', 3023 'stream' : 'ext://sys.stdout', 3024 }, 3025 }, 3026 'root' : { 3027 'level' : 'WARNING', 3028 'handlers' : ['hand1'], 3029 }, 3030 } 3031 3032 class CustomFormatter(logging.Formatter): 3033 custom_property = "." 3034 3035 def format(self, record): 3036 return super().format(record) 3037 3038 config17 = { 3039 'version': 1, 3040 'formatters': { 3041 "custom": { 3042 "()": CustomFormatter, 3043 "style": "{", 3044 "datefmt": "%Y-%m-%d %H:%M:%S", 3045 "format": "{message}", # <-- to force an exception when configuring 3046 ".": { 3047 "custom_property": "value" 3048 } 3049 } 3050 }, 3051 'handlers' : { 3052 'hand1' : { 3053 'class' : 'logging.StreamHandler', 3054 'formatter' : 'custom', 3055 'level' : 'NOTSET', 3056 'stream' : 'ext://sys.stdout', 3057 }, 3058 }, 3059 'root' : { 3060 'level' : 'WARNING', 3061 'handlers' : ['hand1'], 3062 }, 3063 } 3064 3065 config18 = { 3066 "version": 1, 3067 "handlers": { 3068 "console": { 3069 "class": "logging.StreamHandler", 3070 "level": "DEBUG", 3071 }, 3072 "buffering": { 3073 "class": "logging.handlers.MemoryHandler", 3074 "capacity": 5, 3075 "target": "console", 3076 "level": "DEBUG", 3077 "flushLevel": "ERROR" 3078 } 3079 }, 3080 "loggers": { 3081 "mymodule": { 3082 "level": "DEBUG", 3083 "handlers": ["buffering"], 3084 "propagate": "true" 3085 } 3086 } 3087 } 3088 3089 bad_format = { 3090 "version": 1, 3091 "formatters": { 3092 "mySimpleFormatter": { 3093 "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s", 3094 "style": "$" 3095 } 3096 }, 3097 "handlers": { 3098 "fileGlobal": { 3099 "class": "logging.StreamHandler", 3100 "level": "DEBUG", 3101 "formatter": "mySimpleFormatter" 3102 }, 3103 "bufferGlobal": { 3104 "class": "logging.handlers.MemoryHandler", 3105 "capacity": 5, 3106 "formatter": "mySimpleFormatter", 3107 "target": "fileGlobal", 3108 "level": "DEBUG" 3109 } 3110 }, 3111 "loggers": { 3112 "mymodule": { 3113 "level": "DEBUG", 3114 "handlers": ["bufferGlobal"], 3115 "propagate": "true" 3116 } 3117 } 3118 } 3119 3120 # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False 3121 custom_formatter_class_validate = { 3122 'version': 1, 3123 'formatters': { 3124 'form1': { 3125 '()': __name__ + '.ExceptionFormatter', 3126 'format': '%(levelname)s:%(name)s:%(message)s', 3127 'validate': False, 3128 }, 3129 }, 3130 'handlers' : { 3131 'hand1' : { 3132 'class': 'logging.StreamHandler', 3133 'formatter': 'form1', 3134 'level': 'NOTSET', 3135 'stream': 'ext://sys.stdout', 3136 }, 3137 }, 3138 "loggers": { 3139 "my_test_logger_custom_formatter": { 3140 "level": "DEBUG", 3141 "handlers": ["hand1"], 3142 "propagate": "true" 3143 } 3144 } 3145 } 3146 3147 # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False 3148 custom_formatter_class_validate2 = { 3149 'version': 1, 3150 'formatters': { 3151 'form1': { 3152 'class': __name__ + '.ExceptionFormatter', 3153 'format': '%(levelname)s:%(name)s:%(message)s', 3154 'validate': False, 3155 }, 3156 }, 3157 'handlers' : { 3158 'hand1' : { 3159 'class': 'logging.StreamHandler', 3160 'formatter': 'form1', 3161 'level': 'NOTSET', 3162 'stream': 'ext://sys.stdout', 3163 }, 3164 }, 3165 "loggers": { 3166 "my_test_logger_custom_formatter": { 3167 "level": "DEBUG", 3168 "handlers": ["hand1"], 3169 "propagate": "true" 3170 } 3171 } 3172 } 3173 3174 # Configuration with custom class that is not inherited from logging.Formatter 3175 custom_formatter_class_validate3 = { 3176 'version': 1, 3177 'formatters': { 3178 'form1': { 3179 'class': __name__ + '.myCustomFormatter', 3180 'format': '%(levelname)s:%(name)s:%(message)s', 3181 'validate': False, 3182 }, 3183 }, 3184 'handlers' : { 3185 'hand1' : { 3186 'class': 'logging.StreamHandler', 3187 'formatter': 'form1', 3188 'level': 'NOTSET', 3189 'stream': 'ext://sys.stdout', 3190 }, 3191 }, 3192 "loggers": { 3193 "my_test_logger_custom_formatter": { 3194 "level": "DEBUG", 3195 "handlers": ["hand1"], 3196 "propagate": "true" 3197 } 3198 } 3199 } 3200 3201 # Configuration with custom function, 'validate' set to False and no defaults 3202 custom_formatter_with_function = { 3203 'version': 1, 3204 'formatters': { 3205 'form1': { 3206 '()': formatFunc, 3207 'format': '%(levelname)s:%(name)s:%(message)s', 3208 'validate': False, 3209 }, 3210 }, 3211 'handlers' : { 3212 'hand1' : { 3213 'class': 'logging.StreamHandler', 3214 'formatter': 'form1', 3215 'level': 'NOTSET', 3216 'stream': 'ext://sys.stdout', 3217 }, 3218 }, 3219 "loggers": { 3220 "my_test_logger_custom_formatter": { 3221 "level": "DEBUG", 3222 "handlers": ["hand1"], 3223 "propagate": "true" 3224 } 3225 } 3226 } 3227 3228 # Configuration with custom function, and defaults 3229 custom_formatter_with_defaults = { 3230 'version': 1, 3231 'formatters': { 3232 'form1': { 3233 '()': formatFunc, 3234 'format': '%(levelname)s:%(name)s:%(message)s:%(customfield)s', 3235 'defaults': {"customfield": "myvalue"} 3236 }, 3237 }, 3238 'handlers' : { 3239 'hand1' : { 3240 'class': 'logging.StreamHandler', 3241 'formatter': 'form1', 3242 'level': 'NOTSET', 3243 'stream': 'ext://sys.stdout', 3244 }, 3245 }, 3246 "loggers": { 3247 "my_test_logger_custom_formatter": { 3248 "level": "DEBUG", 3249 "handlers": ["hand1"], 3250 "propagate": "true" 3251 } 3252 } 3253 } 3254 3255 config_queue_handler = { 3256 'version': 1, 3257 'handlers' : { 3258 'h1' : { 3259 'class': 'logging.FileHandler', 3260 }, 3261 # key is before depended on handlers to test that deferred config works 3262 'ah' : { 3263 'class': 'logging.handlers.QueueHandler', 3264 'handlers': ['h1'] 3265 }, 3266 }, 3267 "root": { 3268 "level": "DEBUG", 3269 "handlers": ["ah"] 3270 } 3271 } 3272 3273 def apply_config(self, conf): 3274 logging.config.dictConfig(conf) 3275 3276 def check_handler(self, name, cls): 3277 h = logging.getHandlerByName(name) 3278 self.assertIsInstance(h, cls) 3279 3280 def test_config0_ok(self): 3281 # A simple config which overrides the default settings. 3282 with support.captured_stdout() as output: 3283 self.apply_config(self.config0) 3284 self.check_handler('hand1', logging.StreamHandler) 3285 logger = logging.getLogger() 3286 # Won't output anything 3287 logger.info(self.next_message()) 3288 # Outputs a message 3289 logger.error(self.next_message()) 3290 self.assert_log_lines([ 3291 ('ERROR', '2'), 3292 ], stream=output) 3293 # Original logger output is empty. 3294 self.assert_log_lines([]) 3295 3296 def test_config1_ok(self, config=config1): 3297 # A config defining a sub-parser as well. 3298 with support.captured_stdout() as output: 3299 self.apply_config(config) 3300 logger = logging.getLogger("compiler.parser") 3301 # Both will output a message 3302 logger.info(self.next_message()) 3303 logger.error(self.next_message()) 3304 self.assert_log_lines([ 3305 ('INFO', '1'), 3306 ('ERROR', '2'), 3307 ], stream=output) 3308 # Original logger output is empty. 3309 self.assert_log_lines([]) 3310 3311 def test_config2_failure(self): 3312 # A simple config which overrides the default settings. 3313 self.assertRaises(Exception, self.apply_config, self.config2) 3314 3315 def test_config2a_failure(self): 3316 # A simple config which overrides the default settings. 3317 self.assertRaises(Exception, self.apply_config, self.config2a) 3318 3319 def test_config2b_failure(self): 3320 # A simple config which overrides the default settings. 3321 self.assertRaises(Exception, self.apply_config, self.config2b) 3322 3323 def test_config3_failure(self): 3324 # A simple config which overrides the default settings. 3325 self.assertRaises(Exception, self.apply_config, self.config3) 3326 3327 def test_config4_ok(self): 3328 # A config specifying a custom formatter class. 3329 with support.captured_stdout() as output: 3330 self.apply_config(self.config4) 3331 self.check_handler('hand1', logging.StreamHandler) 3332 #logger = logging.getLogger() 3333 try: 3334 raise RuntimeError() 3335 except RuntimeError: 3336 logging.exception("just testing") 3337 sys.stdout.seek(0) 3338 self.assertEqual(output.getvalue(), 3339 "ERROR:root:just testing\nGot a [RuntimeError]\n") 3340 # Original logger output is empty 3341 self.assert_log_lines([]) 3342 3343 def test_config4a_ok(self): 3344 # A config specifying a custom formatter class. 3345 with support.captured_stdout() as output: 3346 self.apply_config(self.config4a) 3347 #logger = logging.getLogger() 3348 try: 3349 raise RuntimeError() 3350 except RuntimeError: 3351 logging.exception("just testing") 3352 sys.stdout.seek(0) 3353 self.assertEqual(output.getvalue(), 3354 "ERROR:root:just testing\nGot a [RuntimeError]\n") 3355 # Original logger output is empty 3356 self.assert_log_lines([]) 3357 3358 def test_config5_ok(self): 3359 self.test_config1_ok(config=self.config5) 3360 self.check_handler('hand1', CustomHandler) 3361 3362 def test_config6_failure(self): 3363 self.assertRaises(Exception, self.apply_config, self.config6) 3364 3365 def test_config7_ok(self): 3366 with support.captured_stdout() as output: 3367 self.apply_config(self.config1) 3368 logger = logging.getLogger("compiler.parser") 3369 # Both will output a message 3370 logger.info(self.next_message()) 3371 logger.error(self.next_message()) 3372 self.assert_log_lines([ 3373 ('INFO', '1'), 3374 ('ERROR', '2'), 3375 ], stream=output) 3376 # Original logger output is empty. 3377 self.assert_log_lines([]) 3378 with support.captured_stdout() as output: 3379 self.apply_config(self.config7) 3380 self.check_handler('hand1', logging.StreamHandler) 3381 logger = logging.getLogger("compiler.parser") 3382 self.assertTrue(logger.disabled) 3383 logger = logging.getLogger("compiler.lexer") 3384 # Both will output a message 3385 logger.info(self.next_message()) 3386 logger.error(self.next_message()) 3387 self.assert_log_lines([ 3388 ('INFO', '3'), 3389 ('ERROR', '4'), 3390 ], stream=output) 3391 # Original logger output is empty. 3392 self.assert_log_lines([]) 3393 3394 # Same as test_config_7_ok but don't disable old loggers. 3395 def test_config_8_ok(self): 3396 with support.captured_stdout() as output: 3397 self.apply_config(self.config1) 3398 logger = logging.getLogger("compiler.parser") 3399 # All will output a message 3400 logger.info(self.next_message()) 3401 logger.error(self.next_message()) 3402 self.assert_log_lines([ 3403 ('INFO', '1'), 3404 ('ERROR', '2'), 3405 ], stream=output) 3406 # Original logger output is empty. 3407 self.assert_log_lines([]) 3408 with support.captured_stdout() as output: 3409 self.apply_config(self.config8) 3410 self.check_handler('hand1', logging.StreamHandler) 3411 logger = logging.getLogger("compiler.parser") 3412 self.assertFalse(logger.disabled) 3413 # Both will output a message 3414 logger.info(self.next_message()) 3415 logger.error(self.next_message()) 3416 logger = logging.getLogger("compiler.lexer") 3417 # Both will output a message 3418 logger.info(self.next_message()) 3419 logger.error(self.next_message()) 3420 self.assert_log_lines([ 3421 ('INFO', '3'), 3422 ('ERROR', '4'), 3423 ('INFO', '5'), 3424 ('ERROR', '6'), 3425 ], stream=output) 3426 # Original logger output is empty. 3427 self.assert_log_lines([]) 3428 3429 def test_config_8a_ok(self): 3430 with support.captured_stdout() as output: 3431 self.apply_config(self.config1a) 3432 self.check_handler('hand1', logging.StreamHandler) 3433 logger = logging.getLogger("compiler.parser") 3434 # See issue #11424. compiler-hyphenated sorts 3435 # between compiler and compiler.xyz and this 3436 # was preventing compiler.xyz from being included 3437 # in the child loggers of compiler because of an 3438 # overzealous loop termination condition. 3439 hyphenated = logging.getLogger('compiler-hyphenated') 3440 # All will output a message 3441 logger.info(self.next_message()) 3442 logger.error(self.next_message()) 3443 hyphenated.critical(self.next_message()) 3444 self.assert_log_lines([ 3445 ('INFO', '1'), 3446 ('ERROR', '2'), 3447 ('CRITICAL', '3'), 3448 ], stream=output) 3449 # Original logger output is empty. 3450 self.assert_log_lines([]) 3451 with support.captured_stdout() as output: 3452 self.apply_config(self.config8a) 3453 self.check_handler('hand1', logging.StreamHandler) 3454 logger = logging.getLogger("compiler.parser") 3455 self.assertFalse(logger.disabled) 3456 # Both will output a message 3457 logger.info(self.next_message()) 3458 logger.error(self.next_message()) 3459 logger = logging.getLogger("compiler.lexer") 3460 # Both will output a message 3461 logger.info(self.next_message()) 3462 logger.error(self.next_message()) 3463 # Will not appear 3464 hyphenated.critical(self.next_message()) 3465 self.assert_log_lines([ 3466 ('INFO', '4'), 3467 ('ERROR', '5'), 3468 ('INFO', '6'), 3469 ('ERROR', '7'), 3470 ], stream=output) 3471 # Original logger output is empty. 3472 self.assert_log_lines([]) 3473 3474 def test_config_9_ok(self): 3475 with support.captured_stdout() as output: 3476 self.apply_config(self.config9) 3477 self.check_handler('hand1', logging.StreamHandler) 3478 logger = logging.getLogger("compiler.parser") 3479 # Nothing will be output since both handler and logger are set to WARNING 3480 logger.info(self.next_message()) 3481 self.assert_log_lines([], stream=output) 3482 self.apply_config(self.config9a) 3483 # Nothing will be output since handler is still set to WARNING 3484 logger.info(self.next_message()) 3485 self.assert_log_lines([], stream=output) 3486 self.apply_config(self.config9b) 3487 # Message should now be output 3488 logger.info(self.next_message()) 3489 self.assert_log_lines([ 3490 ('INFO', '3'), 3491 ], stream=output) 3492 3493 def test_config_10_ok(self): 3494 with support.captured_stdout() as output: 3495 self.apply_config(self.config10) 3496 self.check_handler('hand1', logging.StreamHandler) 3497 logger = logging.getLogger("compiler.parser") 3498 logger.warning(self.next_message()) 3499 logger = logging.getLogger('compiler') 3500 # Not output, because filtered 3501 logger.warning(self.next_message()) 3502 logger = logging.getLogger('compiler.lexer') 3503 # Not output, because filtered 3504 logger.warning(self.next_message()) 3505 logger = logging.getLogger("compiler.parser.codegen") 3506 # Output, as not filtered 3507 logger.error(self.next_message()) 3508 self.assert_log_lines([ 3509 ('WARNING', '1'), 3510 ('ERROR', '4'), 3511 ], stream=output) 3512 3513 def test_config11_ok(self): 3514 self.test_config1_ok(self.config11) 3515 3516 def test_config12_failure(self): 3517 self.assertRaises(Exception, self.apply_config, self.config12) 3518 3519 def test_config13_failure(self): 3520 self.assertRaises(Exception, self.apply_config, self.config13) 3521 3522 def test_config14_ok(self): 3523 with support.captured_stdout() as output: 3524 self.apply_config(self.config14) 3525 h = logging._handlers['hand1'] 3526 self.assertEqual(h.foo, 'bar') 3527 self.assertEqual(h.terminator, '!\n') 3528 logging.warning('Exclamation') 3529 self.assertTrue(output.getvalue().endswith('Exclamation!\n')) 3530 3531 def test_config15_ok(self): 3532 3533 with self.check_no_resource_warning(): 3534 fn = make_temp_file(".log", "test_logging-X-") 3535 3536 config = { 3537 "version": 1, 3538 "handlers": { 3539 "file": { 3540 "class": "logging.FileHandler", 3541 "filename": fn, 3542 "encoding": "utf-8", 3543 } 3544 }, 3545 "root": { 3546 "handlers": ["file"] 3547 } 3548 } 3549 3550 self.apply_config(config) 3551 self.apply_config(config) 3552 3553 handler = logging.root.handlers[0] 3554 self.addCleanup(closeFileHandler, handler, fn) 3555 3556 def test_config16_ok(self): 3557 self.apply_config(self.config16) 3558 h = logging._handlers['hand1'] 3559 3560 # Custom value 3561 result = h.formatter.format(logging.makeLogRecord( 3562 {'msg': 'Hello', 'customfield': 'customvalue'})) 3563 self.assertEqual(result, 'Hello ++ customvalue') 3564 3565 # Default value 3566 result = h.formatter.format(logging.makeLogRecord( 3567 {'msg': 'Hello'})) 3568 self.assertEqual(result, 'Hello ++ defaultvalue') 3569 3570 def test_config17_ok(self): 3571 self.apply_config(self.config17) 3572 h = logging._handlers['hand1'] 3573 self.assertEqual(h.formatter.custom_property, 'value') 3574 3575 def test_config18_ok(self): 3576 self.apply_config(self.config18) 3577 handler = logging.getLogger('mymodule').handlers[0] 3578 self.assertEqual(handler.flushLevel, logging.ERROR) 3579 3580 def setup_via_listener(self, text, verify=None): 3581 text = text.encode("utf-8") 3582 # Ask for a randomly assigned port (by using port 0) 3583 t = logging.config.listen(0, verify) 3584 t.start() 3585 t.ready.wait() 3586 # Now get the port allocated 3587 port = t.port 3588 t.ready.clear() 3589 try: 3590 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 3591 sock.settimeout(2.0) 3592 sock.connect(('localhost', port)) 3593 3594 slen = struct.pack('>L', len(text)) 3595 s = slen + text 3596 sentsofar = 0 3597 left = len(s) 3598 while left > 0: 3599 sent = sock.send(s[sentsofar:]) 3600 sentsofar += sent 3601 left -= sent 3602 sock.close() 3603 finally: 3604 t.ready.wait(2.0) 3605 logging.config.stopListening() 3606 threading_helper.join_thread(t) 3607 3608 @support.requires_working_socket() 3609 def test_listen_config_10_ok(self): 3610 with support.captured_stdout() as output: 3611 self.setup_via_listener(json.dumps(self.config10)) 3612 self.check_handler('hand1', logging.StreamHandler) 3613 logger = logging.getLogger("compiler.parser") 3614 logger.warning(self.next_message()) 3615 logger = logging.getLogger('compiler') 3616 # Not output, because filtered 3617 logger.warning(self.next_message()) 3618 logger = logging.getLogger('compiler.lexer') 3619 # Not output, because filtered 3620 logger.warning(self.next_message()) 3621 logger = logging.getLogger("compiler.parser.codegen") 3622 # Output, as not filtered 3623 logger.error(self.next_message()) 3624 self.assert_log_lines([ 3625 ('WARNING', '1'), 3626 ('ERROR', '4'), 3627 ], stream=output) 3628 3629 @support.requires_working_socket() 3630 def test_listen_config_1_ok(self): 3631 with support.captured_stdout() as output: 3632 self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) 3633 logger = logging.getLogger("compiler.parser") 3634 # Both will output a message 3635 logger.info(self.next_message()) 3636 logger.error(self.next_message()) 3637 self.assert_log_lines([ 3638 ('INFO', '1'), 3639 ('ERROR', '2'), 3640 ], stream=output) 3641 # Original logger output is empty. 3642 self.assert_log_lines([]) 3643 3644 @support.requires_working_socket() 3645 def test_listen_verify(self): 3646 3647 def verify_fail(stuff): 3648 return None 3649 3650 def verify_reverse(stuff): 3651 return stuff[::-1] 3652 3653 logger = logging.getLogger("compiler.parser") 3654 to_send = textwrap.dedent(ConfigFileTest.config1) 3655 # First, specify a verification function that will fail. 3656 # We expect to see no output, since our configuration 3657 # never took effect. 3658 with support.captured_stdout() as output: 3659 self.setup_via_listener(to_send, verify_fail) 3660 # Both will output a message 3661 logger.info(self.next_message()) 3662 logger.error(self.next_message()) 3663 self.assert_log_lines([], stream=output) 3664 # Original logger output has the stuff we logged. 3665 self.assert_log_lines([ 3666 ('INFO', '1'), 3667 ('ERROR', '2'), 3668 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3669 3670 # Now, perform no verification. Our configuration 3671 # should take effect. 3672 3673 with support.captured_stdout() as output: 3674 self.setup_via_listener(to_send) # no verify callable specified 3675 logger = logging.getLogger("compiler.parser") 3676 # Both will output a message 3677 logger.info(self.next_message()) 3678 logger.error(self.next_message()) 3679 self.assert_log_lines([ 3680 ('INFO', '3'), 3681 ('ERROR', '4'), 3682 ], stream=output) 3683 # Original logger output still has the stuff we logged before. 3684 self.assert_log_lines([ 3685 ('INFO', '1'), 3686 ('ERROR', '2'), 3687 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3688 3689 # Now, perform verification which transforms the bytes. 3690 3691 with support.captured_stdout() as output: 3692 self.setup_via_listener(to_send[::-1], verify_reverse) 3693 logger = logging.getLogger("compiler.parser") 3694 # Both will output a message 3695 logger.info(self.next_message()) 3696 logger.error(self.next_message()) 3697 self.assert_log_lines([ 3698 ('INFO', '5'), 3699 ('ERROR', '6'), 3700 ], stream=output) 3701 # Original logger output still has the stuff we logged before. 3702 self.assert_log_lines([ 3703 ('INFO', '1'), 3704 ('ERROR', '2'), 3705 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3706 3707 def test_bad_format(self): 3708 self.assertRaises(ValueError, self.apply_config, self.bad_format) 3709 3710 def test_bad_format_with_dollar_style(self): 3711 config = copy.deepcopy(self.bad_format) 3712 config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}" 3713 3714 self.apply_config(config) 3715 handler = logging.getLogger('mymodule').handlers[0] 3716 self.assertIsInstance(handler.target, logging.Handler) 3717 self.assertIsInstance(handler.formatter._style, 3718 logging.StringTemplateStyle) 3719 self.assertEqual(sorted(logging.getHandlerNames()), 3720 ['bufferGlobal', 'fileGlobal']) 3721 3722 def test_custom_formatter_class_with_validate(self): 3723 self.apply_config(self.custom_formatter_class_validate) 3724 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3725 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3726 3727 def test_custom_formatter_class_with_validate2(self): 3728 self.apply_config(self.custom_formatter_class_validate2) 3729 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3730 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3731 3732 def test_custom_formatter_class_with_validate2_with_wrong_fmt(self): 3733 config = self.custom_formatter_class_validate.copy() 3734 config['formatters']['form1']['style'] = "$" 3735 3736 # Exception should not be raised as we have configured 'validate' to False 3737 self.apply_config(config) 3738 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3739 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3740 3741 def test_custom_formatter_class_with_validate3(self): 3742 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3) 3743 3744 def test_custom_formatter_function_with_validate(self): 3745 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function) 3746 3747 def test_custom_formatter_function_with_defaults(self): 3748 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_defaults) 3749 3750 def test_baseconfig(self): 3751 d = { 3752 'atuple': (1, 2, 3), 3753 'alist': ['a', 'b', 'c'], 3754 'adict': { 3755 'd': 'e', 'f': 3 , 3756 'alpha numeric 1 with spaces' : 5, 3757 'alpha numeric 1 %( - © ©ß¯' : 9, 3758 'alpha numeric ] 1 with spaces' : 15, 3759 'alpha ]] numeric 1 %( - © ©ß¯]' : 19, 3760 ' alpha [ numeric 1 %( - © ©ß¯] ' : 11, 3761 ' alpha ' : 32, 3762 '' : 10, 3763 'nest4' : { 3764 'd': 'e', 'f': 3 , 3765 'alpha numeric 1 with spaces' : 5, 3766 'alpha numeric 1 %( - © ©ß¯' : 9, 3767 '' : 10, 3768 'somelist' : ('g', ('h', 'i'), 'j'), 3769 'somedict' : { 3770 'a' : 1, 3771 'a with 1 and space' : 3, 3772 'a with ( and space' : 4, 3773 } 3774 } 3775 }, 3776 'nest1': ('g', ('h', 'i'), 'j'), 3777 'nest2': ['k', ['l', 'm'], 'n'], 3778 'nest3': ['o', 'cfg://alist', 'p'], 3779 } 3780 bc = logging.config.BaseConfigurator(d) 3781 self.assertEqual(bc.convert('cfg://atuple[1]'), 2) 3782 self.assertEqual(bc.convert('cfg://alist[1]'), 'b') 3783 self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h') 3784 self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm') 3785 self.assertEqual(bc.convert('cfg://adict.d'), 'e') 3786 self.assertEqual(bc.convert('cfg://adict[f]'), 3) 3787 self.assertEqual(bc.convert('cfg://adict[alpha numeric 1 with spaces]'), 5) 3788 self.assertEqual(bc.convert('cfg://adict[alpha numeric 1 %( - © ©ß¯]'), 9) 3789 self.assertEqual(bc.convert('cfg://adict[]'), 10) 3790 self.assertEqual(bc.convert('cfg://adict.nest4.d'), 'e') 3791 self.assertEqual(bc.convert('cfg://adict.nest4[d]'), 'e') 3792 self.assertEqual(bc.convert('cfg://adict[nest4].d'), 'e') 3793 self.assertEqual(bc.convert('cfg://adict[nest4][f]'), 3) 3794 self.assertEqual(bc.convert('cfg://adict[nest4][alpha numeric 1 with spaces]'), 5) 3795 self.assertEqual(bc.convert('cfg://adict[nest4][alpha numeric 1 %( - © ©ß¯]'), 9) 3796 self.assertEqual(bc.convert('cfg://adict[nest4][]'), 10) 3797 self.assertEqual(bc.convert('cfg://adict[nest4][somelist][0]'), 'g') 3798 self.assertEqual(bc.convert('cfg://adict[nest4][somelist][1][0]'), 'h') 3799 self.assertEqual(bc.convert('cfg://adict[nest4][somelist][1][1]'), 'i') 3800 self.assertEqual(bc.convert('cfg://adict[nest4][somelist][2]'), 'j') 3801 self.assertEqual(bc.convert('cfg://adict[nest4].somedict.a'), 1) 3802 self.assertEqual(bc.convert('cfg://adict[nest4].somedict[a]'), 1) 3803 self.assertEqual(bc.convert('cfg://adict[nest4].somedict[a with 1 and space]'), 3) 3804 self.assertEqual(bc.convert('cfg://adict[nest4].somedict[a with ( and space]'), 4) 3805 self.assertEqual(bc.convert('cfg://adict.nest4.somelist[1][1]'), 'i') 3806 self.assertEqual(bc.convert('cfg://adict.nest4.somelist[2]'), 'j') 3807 self.assertEqual(bc.convert('cfg://adict.nest4.somedict.a'), 1) 3808 self.assertEqual(bc.convert('cfg://adict.nest4.somedict[a]'), 1) 3809 v = bc.convert('cfg://nest3') 3810 self.assertEqual(v.pop(1), ['a', 'b', 'c']) 3811 self.assertRaises(KeyError, bc.convert, 'cfg://nosuch') 3812 self.assertRaises(ValueError, bc.convert, 'cfg://!') 3813 self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]') 3814 self.assertRaises(KeyError, bc.convert, 'cfg://adict[alpha numeric ] 1 with spaces]') 3815 self.assertRaises(ValueError, bc.convert, 'cfg://adict[ alpha ]] numeric 1 %( - © ©ß¯] ]') 3816 self.assertRaises(ValueError, bc.convert, 'cfg://adict[ alpha [ numeric 1 %( - © ©ß¯] ]') 3817 3818 def test_namedtuple(self): 3819 # see bpo-39142 3820 from collections import namedtuple 3821 3822 class MyHandler(logging.StreamHandler): 3823 def __init__(self, resource, *args, **kwargs): 3824 super().__init__(*args, **kwargs) 3825 self.resource: namedtuple = resource 3826 3827 def emit(self, record): 3828 record.msg += f' {self.resource.type}' 3829 return super().emit(record) 3830 3831 Resource = namedtuple('Resource', ['type', 'labels']) 3832 resource = Resource(type='my_type', labels=['a']) 3833 3834 config = { 3835 'version': 1, 3836 'handlers': { 3837 'myhandler': { 3838 '()': MyHandler, 3839 'resource': resource 3840 } 3841 }, 3842 'root': {'level': 'INFO', 'handlers': ['myhandler']}, 3843 } 3844 with support.captured_stderr() as stderr: 3845 self.apply_config(config) 3846 logging.info('some log') 3847 self.assertEqual(stderr.getvalue(), 'some log my_type\n') 3848 3849 def test_config_callable_filter_works(self): 3850 def filter_(_): 3851 return 1 3852 self.apply_config({ 3853 "version": 1, "root": {"level": "DEBUG", "filters": [filter_]} 3854 }) 3855 assert logging.getLogger().filters[0] is filter_ 3856 logging.getLogger().filters = [] 3857 3858 def test_config_filter_works(self): 3859 filter_ = logging.Filter("spam.eggs") 3860 self.apply_config({ 3861 "version": 1, "root": {"level": "DEBUG", "filters": [filter_]} 3862 }) 3863 assert logging.getLogger().filters[0] is filter_ 3864 logging.getLogger().filters = [] 3865 3866 def test_config_filter_method_works(self): 3867 class FakeFilter: 3868 def filter(self, _): 3869 return 1 3870 filter_ = FakeFilter() 3871 self.apply_config({ 3872 "version": 1, "root": {"level": "DEBUG", "filters": [filter_]} 3873 }) 3874 assert logging.getLogger().filters[0] is filter_ 3875 logging.getLogger().filters = [] 3876 3877 def test_invalid_type_raises(self): 3878 class NotAFilter: pass 3879 for filter_ in [None, 1, NotAFilter()]: 3880 self.assertRaises( 3881 ValueError, 3882 self.apply_config, 3883 {"version": 1, "root": {"level": "DEBUG", "filters": [filter_]}} 3884 ) 3885 3886 def do_queuehandler_configuration(self, qspec, lspec): 3887 cd = copy.deepcopy(self.config_queue_handler) 3888 fn = make_temp_file('.log', 'test_logging-cqh-') 3889 cd['handlers']['h1']['filename'] = fn 3890 if qspec is not None: 3891 cd['handlers']['ah']['queue'] = qspec 3892 if lspec is not None: 3893 cd['handlers']['ah']['listener'] = lspec 3894 qh = None 3895 try: 3896 self.apply_config(cd) 3897 qh = logging.getHandlerByName('ah') 3898 self.assertEqual(sorted(logging.getHandlerNames()), ['ah', 'h1']) 3899 self.assertIsNotNone(qh.listener) 3900 qh.listener.start() 3901 logging.debug('foo') 3902 logging.info('bar') 3903 logging.warning('baz') 3904 3905 # Need to let the listener thread finish its work 3906 while support.sleeping_retry(support.LONG_TIMEOUT, 3907 "queue not empty"): 3908 if qh.listener.queue.empty(): 3909 break 3910 3911 # wait until the handler completed its last task 3912 qh.listener.queue.join() 3913 3914 with open(fn, encoding='utf-8') as f: 3915 data = f.read().splitlines() 3916 self.assertEqual(data, ['foo', 'bar', 'baz']) 3917 finally: 3918 if qh: 3919 qh.listener.stop() 3920 h = logging.getHandlerByName('h1') 3921 if h: 3922 self.addCleanup(closeFileHandler, h, fn) 3923 else: 3924 self.addCleanup(os.remove, fn) 3925 3926 @threading_helper.requires_working_threading() 3927 @support.requires_subprocess() 3928 def test_config_queue_handler(self): 3929 qs = [CustomQueue(), CustomQueueProtocol()] 3930 dqs = [{'()': f'{__name__}.{cls}', 'maxsize': 10} 3931 for cls in ['CustomQueue', 'CustomQueueProtocol']] 3932 dl = { 3933 '()': __name__ + '.listenerMaker', 3934 'arg1': None, 3935 'arg2': None, 3936 'respect_handler_level': True 3937 } 3938 qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', *dqs, *qs) 3939 lvalues = (None, __name__ + '.CustomListener', dl, CustomListener) 3940 for qspec, lspec in itertools.product(qvalues, lvalues): 3941 self.do_queuehandler_configuration(qspec, lspec) 3942 3943 # Some failure cases 3944 qvalues = (None, 4, int, '', 'foo') 3945 lvalues = (None, 4, int, '', 'bar') 3946 for qspec, lspec in itertools.product(qvalues, lvalues): 3947 if lspec is None and qspec is None: 3948 continue 3949 with self.assertRaises(ValueError) as ctx: 3950 self.do_queuehandler_configuration(qspec, lspec) 3951 msg = str(ctx.exception) 3952 self.assertEqual(msg, "Unable to configure handler 'ah'") 3953 3954 def _apply_simple_queue_listener_configuration(self, qspec): 3955 self.apply_config({ 3956 "version": 1, 3957 "handlers": { 3958 "queue_listener": { 3959 "class": "logging.handlers.QueueHandler", 3960 "queue": qspec, 3961 }, 3962 }, 3963 }) 3964 3965 @threading_helper.requires_working_threading() 3966 @support.requires_subprocess() 3967 @patch("multiprocessing.Manager") 3968 def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager): 3969 # gh-120868, gh-121723, gh-124653 3970 3971 for qspec in [ 3972 {"()": "queue.Queue", "maxsize": -1}, 3973 queue.Queue(), 3974 # queue.SimpleQueue does not inherit from queue.Queue 3975 queue.SimpleQueue(), 3976 # CustomQueueFakeProtocol passes the checks but will not be usable 3977 # since the signatures are incompatible. Checking the Queue API 3978 # without testing the type of the actual queue is a trade-off 3979 # between usability and the work we need to do in order to safely 3980 # check that the queue object correctly implements the API. 3981 CustomQueueFakeProtocol(), 3982 MinimalQueueProtocol(), 3983 ]: 3984 with self.subTest(qspec=qspec): 3985 self._apply_simple_queue_listener_configuration(qspec) 3986 manager.assert_not_called() 3987 3988 @patch("multiprocessing.Manager") 3989 def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager): 3990 # gh-120868, gh-121723 3991 3992 for qspec in [object(), CustomQueueWrongProtocol()]: 3993 with self.subTest(qspec=qspec), self.assertRaises(ValueError): 3994 self._apply_simple_queue_listener_configuration(qspec) 3995 manager.assert_not_called() 3996 3997 @skip_if_tsan_fork 3998 @support.requires_subprocess() 3999 @unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing" 4000 " assertions in multiprocessing") 4001 def test_config_reject_simple_queue_handler_multiprocessing_context(self): 4002 # multiprocessing.SimpleQueue does not implement 'put_nowait' 4003 # and thus cannot be used as a queue-like object (gh-124653) 4004 4005 import multiprocessing 4006 4007 if support.MS_WINDOWS: 4008 start_methods = ['spawn'] 4009 else: 4010 start_methods = ['spawn', 'fork', 'forkserver'] 4011 4012 for start_method in start_methods: 4013 with self.subTest(start_method=start_method): 4014 ctx = multiprocessing.get_context(start_method) 4015 qspec = ctx.SimpleQueue() 4016 with self.assertRaises(ValueError): 4017 self._apply_simple_queue_listener_configuration(qspec) 4018 4019 @skip_if_tsan_fork 4020 @support.requires_subprocess() 4021 @unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing" 4022 " assertions in multiprocessing") 4023 def test_config_queue_handler_multiprocessing_context(self): 4024 # regression test for gh-121723 4025 if support.MS_WINDOWS: 4026 start_methods = ['spawn'] 4027 else: 4028 start_methods = ['spawn', 'fork', 'forkserver'] 4029 for start_method in start_methods: 4030 with self.subTest(start_method=start_method): 4031 ctx = multiprocessing.get_context(start_method) 4032 with ctx.Manager() as manager: 4033 q = manager.Queue() 4034 records = [] 4035 # use 1 process and 1 task per child to put 1 record 4036 with ctx.Pool(1, initializer=self._mpinit_issue121723, 4037 initargs=(q, "text"), maxtasksperchild=1): 4038 records.append(q.get(timeout=60)) 4039 self.assertTrue(q.empty()) 4040 self.assertEqual(len(records), 1) 4041 4042 @staticmethod 4043 def _mpinit_issue121723(qspec, message_to_log): 4044 # static method for pickling support 4045 logging.config.dictConfig({ 4046 'version': 1, 4047 'disable_existing_loggers': True, 4048 'handlers': { 4049 'log_to_parent': { 4050 'class': 'logging.handlers.QueueHandler', 4051 'queue': qspec 4052 } 4053 }, 4054 'root': {'handlers': ['log_to_parent'], 'level': 'DEBUG'} 4055 }) 4056 # log a message (this creates a record put in the queue) 4057 logging.getLogger().info(message_to_log) 4058 4059 @skip_if_tsan_fork 4060 @support.requires_subprocess() 4061 def test_multiprocessing_queues(self): 4062 # See gh-119819 4063 4064 cd = copy.deepcopy(self.config_queue_handler) 4065 from multiprocessing import Queue as MQ, Manager as MM 4066 q1 = MQ() # this can't be pickled 4067 q2 = MM().Queue() # a proxy queue for use when pickling is needed 4068 q3 = MM().JoinableQueue() # a joinable proxy queue 4069 for qspec in (q1, q2, q3): 4070 fn = make_temp_file('.log', 'test_logging-cmpqh-') 4071 cd['handlers']['h1']['filename'] = fn 4072 cd['handlers']['ah']['queue'] = qspec 4073 qh = None 4074 try: 4075 self.apply_config(cd) 4076 qh = logging.getHandlerByName('ah') 4077 self.assertEqual(sorted(logging.getHandlerNames()), ['ah', 'h1']) 4078 self.assertIsNotNone(qh.listener) 4079 self.assertIs(qh.queue, qspec) 4080 self.assertIs(qh.listener.queue, qspec) 4081 finally: 4082 h = logging.getHandlerByName('h1') 4083 if h: 4084 self.addCleanup(closeFileHandler, h, fn) 4085 else: 4086 self.addCleanup(os.remove, fn) 4087 4088 def test_90195(self): 4089 # See gh-90195 4090 config = { 4091 'version': 1, 4092 'disable_existing_loggers': False, 4093 'handlers': { 4094 'console': { 4095 'level': 'DEBUG', 4096 'class': 'logging.StreamHandler', 4097 }, 4098 }, 4099 'loggers': { 4100 'a': { 4101 'level': 'DEBUG', 4102 'handlers': ['console'] 4103 } 4104 } 4105 } 4106 logger = logging.getLogger('a') 4107 self.assertFalse(logger.disabled) 4108 self.apply_config(config) 4109 self.assertFalse(logger.disabled) 4110 # Should disable all loggers ... 4111 self.apply_config({'version': 1}) 4112 self.assertTrue(logger.disabled) 4113 del config['disable_existing_loggers'] 4114 self.apply_config(config) 4115 # Logger should be enabled, since explicitly mentioned 4116 self.assertFalse(logger.disabled) 4117 4118 def test_111615(self): 4119 # See gh-111615 4120 import_helper.import_module('_multiprocessing') # see gh-113692 4121 mp = import_helper.import_module('multiprocessing') 4122 4123 config = { 4124 'version': 1, 4125 'handlers': { 4126 'sink': { 4127 'class': 'logging.handlers.QueueHandler', 4128 'queue': mp.get_context('spawn').Queue(), 4129 }, 4130 }, 4131 'root': { 4132 'handlers': ['sink'], 4133 'level': 'DEBUG', 4134 }, 4135 } 4136 logging.config.dictConfig(config) 4137 4138 # gh-118868: check if kwargs are passed to logging QueueHandler 4139 def test_kwargs_passing(self): 4140 class CustomQueueHandler(logging.handlers.QueueHandler): 4141 def __init__(self, *args, **kwargs): 4142 super().__init__(queue.Queue()) 4143 self.custom_kwargs = kwargs 4144 4145 custom_kwargs = {'foo': 'bar'} 4146 4147 config = { 4148 'version': 1, 4149 'handlers': { 4150 'custom': { 4151 'class': CustomQueueHandler, 4152 **custom_kwargs 4153 }, 4154 }, 4155 'root': { 4156 'level': 'DEBUG', 4157 'handlers': ['custom'] 4158 } 4159 } 4160 4161 logging.config.dictConfig(config) 4162 4163 handler = logging.getHandlerByName('custom') 4164 self.assertEqual(handler.custom_kwargs, custom_kwargs) 4165 4166 4167class ManagerTest(BaseTest): 4168 def test_manager_loggerclass(self): 4169 logged = [] 4170 4171 class MyLogger(logging.Logger): 4172 def _log(self, level, msg, args, exc_info=None, extra=None): 4173 logged.append(msg) 4174 4175 man = logging.Manager(None) 4176 self.assertRaises(TypeError, man.setLoggerClass, int) 4177 man.setLoggerClass(MyLogger) 4178 logger = man.getLogger('test') 4179 logger.warning('should appear in logged') 4180 logging.warning('should not appear in logged') 4181 4182 self.assertEqual(logged, ['should appear in logged']) 4183 4184 def test_set_log_record_factory(self): 4185 man = logging.Manager(None) 4186 expected = object() 4187 man.setLogRecordFactory(expected) 4188 self.assertEqual(man.logRecordFactory, expected) 4189 4190class ChildLoggerTest(BaseTest): 4191 def test_child_loggers(self): 4192 r = logging.getLogger() 4193 l1 = logging.getLogger('abc') 4194 l2 = logging.getLogger('def.ghi') 4195 c1 = r.getChild('xyz') 4196 c2 = r.getChild('uvw.xyz') 4197 self.assertIs(c1, logging.getLogger('xyz')) 4198 self.assertIs(c2, logging.getLogger('uvw.xyz')) 4199 c1 = l1.getChild('def') 4200 c2 = c1.getChild('ghi') 4201 c3 = l1.getChild('def.ghi') 4202 self.assertIs(c1, logging.getLogger('abc.def')) 4203 self.assertIs(c2, logging.getLogger('abc.def.ghi')) 4204 self.assertIs(c2, c3) 4205 4206 def test_get_children(self): 4207 r = logging.getLogger() 4208 l1 = logging.getLogger('foo') 4209 l2 = logging.getLogger('foo.bar') 4210 l3 = logging.getLogger('foo.bar.baz.bozz') 4211 l4 = logging.getLogger('bar') 4212 kids = r.getChildren() 4213 expected = {l1, l4} 4214 self.assertEqual(expected, kids & expected) # might be other kids for root 4215 self.assertNotIn(l2, expected) 4216 kids = l1.getChildren() 4217 self.assertEqual({l2}, kids) 4218 kids = l2.getChildren() 4219 self.assertEqual(set(), kids) 4220 4221class DerivedLogRecord(logging.LogRecord): 4222 pass 4223 4224class LogRecordFactoryTest(BaseTest): 4225 4226 def setUp(self): 4227 class CheckingFilter(logging.Filter): 4228 def __init__(self, cls): 4229 self.cls = cls 4230 4231 def filter(self, record): 4232 t = type(record) 4233 if t is not self.cls: 4234 msg = 'Unexpected LogRecord type %s, expected %s' % (t, 4235 self.cls) 4236 raise TypeError(msg) 4237 return True 4238 4239 BaseTest.setUp(self) 4240 self.filter = CheckingFilter(DerivedLogRecord) 4241 self.root_logger.addFilter(self.filter) 4242 self.orig_factory = logging.getLogRecordFactory() 4243 4244 def tearDown(self): 4245 self.root_logger.removeFilter(self.filter) 4246 BaseTest.tearDown(self) 4247 logging.setLogRecordFactory(self.orig_factory) 4248 4249 def test_logrecord_class(self): 4250 self.assertRaises(TypeError, self.root_logger.warning, 4251 self.next_message()) 4252 logging.setLogRecordFactory(DerivedLogRecord) 4253 self.root_logger.error(self.next_message()) 4254 self.assert_log_lines([ 4255 ('root', 'ERROR', '2'), 4256 ]) 4257 4258 4259@threading_helper.requires_working_threading() 4260class QueueHandlerTest(BaseTest): 4261 # Do not bother with a logger name group. 4262 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 4263 4264 def setUp(self): 4265 BaseTest.setUp(self) 4266 self.queue = queue.Queue(-1) 4267 self.que_hdlr = logging.handlers.QueueHandler(self.queue) 4268 self.name = 'que' 4269 self.que_logger = logging.getLogger('que') 4270 self.que_logger.propagate = False 4271 self.que_logger.setLevel(logging.WARNING) 4272 self.que_logger.addHandler(self.que_hdlr) 4273 4274 def tearDown(self): 4275 self.que_hdlr.close() 4276 BaseTest.tearDown(self) 4277 4278 def test_queue_handler(self): 4279 self.que_logger.debug(self.next_message()) 4280 self.assertRaises(queue.Empty, self.queue.get_nowait) 4281 self.que_logger.info(self.next_message()) 4282 self.assertRaises(queue.Empty, self.queue.get_nowait) 4283 msg = self.next_message() 4284 self.que_logger.warning(msg) 4285 data = self.queue.get_nowait() 4286 self.assertTrue(isinstance(data, logging.LogRecord)) 4287 self.assertEqual(data.name, self.que_logger.name) 4288 self.assertEqual((data.msg, data.args), (msg, None)) 4289 4290 def test_formatting(self): 4291 msg = self.next_message() 4292 levelname = logging.getLevelName(logging.WARNING) 4293 log_format_str = '{name} -> {levelname}: {message}' 4294 formatted_msg = log_format_str.format(name=self.name, 4295 levelname=levelname, message=msg) 4296 formatter = logging.Formatter(self.log_format) 4297 self.que_hdlr.setFormatter(formatter) 4298 self.que_logger.warning(msg) 4299 log_record = self.queue.get_nowait() 4300 self.assertEqual(formatted_msg, log_record.msg) 4301 self.assertEqual(formatted_msg, log_record.message) 4302 4303 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 4304 'logging.handlers.QueueListener required for this test') 4305 def test_queue_listener(self): 4306 handler = TestHandler(support.Matcher()) 4307 listener = logging.handlers.QueueListener(self.queue, handler) 4308 listener.start() 4309 try: 4310 self.que_logger.warning(self.next_message()) 4311 self.que_logger.error(self.next_message()) 4312 self.que_logger.critical(self.next_message()) 4313 finally: 4314 listener.stop() 4315 listener.stop() # gh-114706 - ensure no crash if called again 4316 self.assertTrue(handler.matches(levelno=logging.WARNING, message='1')) 4317 self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) 4318 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) 4319 handler.close() 4320 4321 # Now test with respect_handler_level set 4322 4323 handler = TestHandler(support.Matcher()) 4324 handler.setLevel(logging.CRITICAL) 4325 listener = logging.handlers.QueueListener(self.queue, handler, 4326 respect_handler_level=True) 4327 listener.start() 4328 try: 4329 self.que_logger.warning(self.next_message()) 4330 self.que_logger.error(self.next_message()) 4331 self.que_logger.critical(self.next_message()) 4332 finally: 4333 listener.stop() 4334 self.assertFalse(handler.matches(levelno=logging.WARNING, message='4')) 4335 self.assertFalse(handler.matches(levelno=logging.ERROR, message='5')) 4336 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6')) 4337 handler.close() 4338 4339 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 4340 'logging.handlers.QueueListener required for this test') 4341 def test_queue_listener_with_StreamHandler(self): 4342 # Test that traceback and stack-info only appends once (bpo-34334, bpo-46755). 4343 listener = logging.handlers.QueueListener(self.queue, self.root_hdlr) 4344 listener.start() 4345 try: 4346 1 / 0 4347 except ZeroDivisionError as e: 4348 exc = e 4349 self.que_logger.exception(self.next_message(), exc_info=exc) 4350 self.que_logger.error(self.next_message(), stack_info=True) 4351 listener.stop() 4352 self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1) 4353 self.assertEqual(self.stream.getvalue().strip().count('Stack'), 1) 4354 4355 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 4356 'logging.handlers.QueueListener required for this test') 4357 def test_queue_listener_with_multiple_handlers(self): 4358 # Test that queue handler format doesn't affect other handler formats (bpo-35726). 4359 self.que_hdlr.setFormatter(self.root_formatter) 4360 self.que_logger.addHandler(self.root_hdlr) 4361 4362 listener = logging.handlers.QueueListener(self.queue, self.que_hdlr) 4363 listener.start() 4364 self.que_logger.error("error") 4365 listener.stop() 4366 self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error") 4367 4368if hasattr(logging.handlers, 'QueueListener'): 4369 import multiprocessing 4370 from unittest.mock import patch 4371 4372 @skip_if_tsan_fork 4373 @threading_helper.requires_working_threading() 4374 class QueueListenerTest(BaseTest): 4375 """ 4376 Tests based on patch submitted for issue #27930. Ensure that 4377 QueueListener handles all log messages. 4378 """ 4379 4380 repeat = 20 4381 4382 @staticmethod 4383 def setup_and_log(log_queue, ident): 4384 """ 4385 Creates a logger with a QueueHandler that logs to a queue read by a 4386 QueueListener. Starts the listener, logs five messages, and stops 4387 the listener. 4388 """ 4389 logger = logging.getLogger('test_logger_with_id_%s' % ident) 4390 logger.setLevel(logging.DEBUG) 4391 handler = logging.handlers.QueueHandler(log_queue) 4392 logger.addHandler(handler) 4393 listener = logging.handlers.QueueListener(log_queue) 4394 listener.start() 4395 4396 logger.info('one') 4397 logger.info('two') 4398 logger.info('three') 4399 logger.info('four') 4400 logger.info('five') 4401 4402 listener.stop() 4403 logger.removeHandler(handler) 4404 handler.close() 4405 4406 @patch.object(logging.handlers.QueueListener, 'handle') 4407 def test_handle_called_with_queue_queue(self, mock_handle): 4408 for i in range(self.repeat): 4409 log_queue = queue.Queue() 4410 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 4411 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 4412 'correct number of handled log messages') 4413 4414 @patch.object(logging.handlers.QueueListener, 'handle') 4415 def test_handle_called_with_mp_queue(self, mock_handle): 4416 # bpo-28668: The multiprocessing (mp) module is not functional 4417 # when the mp.synchronize module cannot be imported. 4418 support.skip_if_broken_multiprocessing_synchronize() 4419 for i in range(self.repeat): 4420 log_queue = multiprocessing.Queue() 4421 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 4422 log_queue.close() 4423 log_queue.join_thread() 4424 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 4425 'correct number of handled log messages') 4426 4427 @staticmethod 4428 def get_all_from_queue(log_queue): 4429 try: 4430 while True: 4431 yield log_queue.get_nowait() 4432 except queue.Empty: 4433 return [] 4434 4435 def test_no_messages_in_queue_after_stop(self): 4436 """ 4437 Five messages are logged then the QueueListener is stopped. This 4438 test then gets everything off the queue. Failure of this test 4439 indicates that messages were not registered on the queue until 4440 _after_ the QueueListener stopped. 4441 """ 4442 # bpo-28668: The multiprocessing (mp) module is not functional 4443 # when the mp.synchronize module cannot be imported. 4444 support.skip_if_broken_multiprocessing_synchronize() 4445 for i in range(self.repeat): 4446 queue = multiprocessing.Queue() 4447 self.setup_and_log(queue, '%s_%s' %(self.id(), i)) 4448 # time.sleep(1) 4449 items = list(self.get_all_from_queue(queue)) 4450 queue.close() 4451 queue.join_thread() 4452 4453 expected = [[], [logging.handlers.QueueListener._sentinel]] 4454 self.assertIn(items, expected, 4455 'Found unexpected messages in queue: %s' % ( 4456 [m.msg if isinstance(m, logging.LogRecord) 4457 else m for m in items])) 4458 4459 def test_calls_task_done_after_stop(self): 4460 # Issue 36813: Make sure queue.join does not deadlock. 4461 log_queue = queue.Queue() 4462 listener = logging.handlers.QueueListener(log_queue) 4463 listener.start() 4464 listener.stop() 4465 with self.assertRaises(ValueError): 4466 # Make sure all tasks are done and .join won't block. 4467 log_queue.task_done() 4468 4469 4470ZERO = datetime.timedelta(0) 4471 4472class UTC(datetime.tzinfo): 4473 def utcoffset(self, dt): 4474 return ZERO 4475 4476 dst = utcoffset 4477 4478 def tzname(self, dt): 4479 return 'UTC' 4480 4481utc = UTC() 4482 4483class AssertErrorMessage: 4484 4485 def assert_error_message(self, exception, message, *args, **kwargs): 4486 try: 4487 self.assertRaises((), *args, **kwargs) 4488 except exception as e: 4489 self.assertEqual(message, str(e)) 4490 4491class FormatterTest(unittest.TestCase, AssertErrorMessage): 4492 def setUp(self): 4493 self.common = { 4494 'name': 'formatter.test', 4495 'level': logging.DEBUG, 4496 'pathname': os.path.join('path', 'to', 'dummy.ext'), 4497 'lineno': 42, 4498 'exc_info': None, 4499 'func': None, 4500 'msg': 'Message with %d %s', 4501 'args': (2, 'placeholders'), 4502 } 4503 self.variants = { 4504 'custom': { 4505 'custom': 1234 4506 } 4507 } 4508 4509 def get_record(self, name=None): 4510 result = dict(self.common) 4511 if name is not None: 4512 result.update(self.variants[name]) 4513 return logging.makeLogRecord(result) 4514 4515 def test_percent(self): 4516 # Test %-formatting 4517 r = self.get_record() 4518 f = logging.Formatter('${%(message)s}') 4519 self.assertEqual(f.format(r), '${Message with 2 placeholders}') 4520 f = logging.Formatter('%(random)s') 4521 self.assertRaises(ValueError, f.format, r) 4522 self.assertFalse(f.usesTime()) 4523 f = logging.Formatter('%(asctime)s') 4524 self.assertTrue(f.usesTime()) 4525 f = logging.Formatter('%(asctime)-15s') 4526 self.assertTrue(f.usesTime()) 4527 f = logging.Formatter('%(asctime)#15s') 4528 self.assertTrue(f.usesTime()) 4529 4530 def test_braces(self): 4531 # Test {}-formatting 4532 r = self.get_record() 4533 f = logging.Formatter('$%{message}%$', style='{') 4534 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 4535 f = logging.Formatter('{random}', style='{') 4536 self.assertRaises(ValueError, f.format, r) 4537 f = logging.Formatter("{message}", style='{') 4538 self.assertFalse(f.usesTime()) 4539 f = logging.Formatter('{asctime}', style='{') 4540 self.assertTrue(f.usesTime()) 4541 f = logging.Formatter('{asctime!s:15}', style='{') 4542 self.assertTrue(f.usesTime()) 4543 f = logging.Formatter('{asctime:15}', style='{') 4544 self.assertTrue(f.usesTime()) 4545 4546 def test_dollars(self): 4547 # Test $-formatting 4548 r = self.get_record() 4549 f = logging.Formatter('${message}', style='$') 4550 self.assertEqual(f.format(r), 'Message with 2 placeholders') 4551 f = logging.Formatter('$message', style='$') 4552 self.assertEqual(f.format(r), 'Message with 2 placeholders') 4553 f = logging.Formatter('$$%${message}%$$', style='$') 4554 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 4555 f = logging.Formatter('${random}', style='$') 4556 self.assertRaises(ValueError, f.format, r) 4557 self.assertFalse(f.usesTime()) 4558 f = logging.Formatter('${asctime}', style='$') 4559 self.assertTrue(f.usesTime()) 4560 f = logging.Formatter('$asctime', style='$') 4561 self.assertTrue(f.usesTime()) 4562 f = logging.Formatter('${message}', style='$') 4563 self.assertFalse(f.usesTime()) 4564 f = logging.Formatter('${asctime}--', style='$') 4565 self.assertTrue(f.usesTime()) 4566 4567 def test_format_validate(self): 4568 # Check correct formatting 4569 # Percentage style 4570 f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 4571 self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 4572 f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 4573 self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 4574 f = logging.Formatter("%(process)#+027.23X") 4575 self.assertEqual(f._fmt, "%(process)#+027.23X") 4576 f = logging.Formatter("%(foo)#.*g") 4577 self.assertEqual(f._fmt, "%(foo)#.*g") 4578 4579 # StrFormat Style 4580 f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{") 4581 self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}") 4582 f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{") 4583 self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}") 4584 f = logging.Formatter("{customfield!s:#<30}", style="{") 4585 self.assertEqual(f._fmt, "{customfield!s:#<30}") 4586 f = logging.Formatter("{message!r}", style="{") 4587 self.assertEqual(f._fmt, "{message!r}") 4588 f = logging.Formatter("{message!s}", style="{") 4589 self.assertEqual(f._fmt, "{message!s}") 4590 f = logging.Formatter("{message!a}", style="{") 4591 self.assertEqual(f._fmt, "{message!a}") 4592 f = logging.Formatter("{process!r:4.2}", style="{") 4593 self.assertEqual(f._fmt, "{process!r:4.2}") 4594 f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{") 4595 self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}") 4596 f = logging.Formatter("{process!s:{w},.{p}}", style="{") 4597 self.assertEqual(f._fmt, "{process!s:{w},.{p}}") 4598 f = logging.Formatter("{foo:12.{p}}", style="{") 4599 self.assertEqual(f._fmt, "{foo:12.{p}}") 4600 f = logging.Formatter("{foo:{w}.6}", style="{") 4601 self.assertEqual(f._fmt, "{foo:{w}.6}") 4602 f = logging.Formatter("{foo[0].bar[1].baz}", style="{") 4603 self.assertEqual(f._fmt, "{foo[0].bar[1].baz}") 4604 f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{") 4605 self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}") 4606 f = logging.Formatter("{12[k1].bar[k2].baz}", style="{") 4607 self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}") 4608 4609 # Dollar style 4610 f = logging.Formatter("${asctime} - $message", style="$") 4611 self.assertEqual(f._fmt, "${asctime} - $message") 4612 f = logging.Formatter("$bar $$", style="$") 4613 self.assertEqual(f._fmt, "$bar $$") 4614 f = logging.Formatter("$bar $$$$", style="$") 4615 self.assertEqual(f._fmt, "$bar $$$$") # this would print two $($$) 4616 4617 # Testing when ValueError being raised from incorrect format 4618 # Percentage Style 4619 self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z") 4620 self.assertRaises(ValueError, logging.Formatter, "%(asctime)b") 4621 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*") 4622 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s") 4623 self.assertRaises(ValueError, logging.Formatter, "%(asctime)_") 4624 self.assertRaises(ValueError, logging.Formatter, '{asctime}') 4625 self.assertRaises(ValueError, logging.Formatter, '${message}') 4626 self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f') # with both * and decimal number as precision 4627 self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f') 4628 4629 # StrFormat Style 4630 # Testing failure for '-' in field name 4631 self.assert_error_message( 4632 ValueError, 4633 "invalid format: invalid field name/expression: 'name-thing'", 4634 logging.Formatter, "{name-thing}", style="{" 4635 ) 4636 # Testing failure for style mismatch 4637 self.assert_error_message( 4638 ValueError, 4639 "invalid format: no fields", 4640 logging.Formatter, '%(asctime)s', style='{' 4641 ) 4642 # Testing failure for invalid conversion 4643 self.assert_error_message( 4644 ValueError, 4645 "invalid conversion: 'Z'" 4646 ) 4647 self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{') 4648 self.assert_error_message( 4649 ValueError, 4650 "invalid format: expected ':' after conversion specifier", 4651 logging.Formatter, '{asctime!aa:15}', style='{' 4652 ) 4653 # Testing failure for invalid spec 4654 self.assert_error_message( 4655 ValueError, 4656 "invalid format: bad specifier: '.2ff'", 4657 logging.Formatter, '{process:.2ff}', style='{' 4658 ) 4659 self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{') 4660 self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{') 4661 self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{') 4662 self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{') 4663 # Testing failure for mismatch braces 4664 self.assert_error_message( 4665 ValueError, 4666 "invalid format: expected '}' before end of string", 4667 logging.Formatter, '{process', style='{' 4668 ) 4669 self.assert_error_message( 4670 ValueError, 4671 "invalid format: Single '}' encountered in format string", 4672 logging.Formatter, 'process}', style='{' 4673 ) 4674 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{') 4675 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{') 4676 self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{') 4677 self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{') 4678 self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{') 4679 self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{') 4680 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{') 4681 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{') 4682 self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{') 4683 4684 # Dollar style 4685 # Testing failure for mismatch bare $ 4686 self.assert_error_message( 4687 ValueError, 4688 "invalid format: bare \'$\' not allowed", 4689 logging.Formatter, '$bar $$$', style='$' 4690 ) 4691 self.assert_error_message( 4692 ValueError, 4693 "invalid format: bare \'$\' not allowed", 4694 logging.Formatter, 'bar $', style='$' 4695 ) 4696 self.assert_error_message( 4697 ValueError, 4698 "invalid format: bare \'$\' not allowed", 4699 logging.Formatter, 'foo $.', style='$' 4700 ) 4701 # Testing failure for mismatch style 4702 self.assert_error_message( 4703 ValueError, 4704 "invalid format: no fields", 4705 logging.Formatter, '{asctime}', style='$' 4706 ) 4707 self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$') 4708 4709 # Testing failure for incorrect fields 4710 self.assert_error_message( 4711 ValueError, 4712 "invalid format: no fields", 4713 logging.Formatter, 'foo', style='$' 4714 ) 4715 self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$') 4716 4717 def test_defaults_parameter(self): 4718 fmts = ['%(custom)s %(message)s', '{custom} {message}', '$custom $message'] 4719 styles = ['%', '{', '$'] 4720 for fmt, style in zip(fmts, styles): 4721 f = logging.Formatter(fmt, style=style, defaults={'custom': 'Default'}) 4722 r = self.get_record() 4723 self.assertEqual(f.format(r), 'Default Message with 2 placeholders') 4724 r = self.get_record("custom") 4725 self.assertEqual(f.format(r), '1234 Message with 2 placeholders') 4726 4727 # Without default 4728 f = logging.Formatter(fmt, style=style) 4729 r = self.get_record() 4730 self.assertRaises(ValueError, f.format, r) 4731 4732 # Non-existing default is ignored 4733 f = logging.Formatter(fmt, style=style, defaults={'Non-existing': 'Default'}) 4734 r = self.get_record("custom") 4735 self.assertEqual(f.format(r), '1234 Message with 2 placeholders') 4736 4737 def test_invalid_style(self): 4738 self.assertRaises(ValueError, logging.Formatter, None, None, 'x') 4739 4740 def test_time(self): 4741 r = self.get_record() 4742 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc) 4743 # We use None to indicate we want the local timezone 4744 # We're essentially converting a UTC time to local time 4745 r.created = time.mktime(dt.astimezone(None).timetuple()) 4746 r.msecs = 123 4747 f = logging.Formatter('%(asctime)s %(message)s') 4748 f.converter = time.gmtime 4749 self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123') 4750 self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21') 4751 f.format(r) 4752 self.assertEqual(r.asctime, '1993-04-21 08:03:00,123') 4753 4754 def test_default_msec_format_none(self): 4755 class NoMsecFormatter(logging.Formatter): 4756 default_msec_format = None 4757 default_time_format = '%d/%m/%Y %H:%M:%S' 4758 4759 r = self.get_record() 4760 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 123, utc) 4761 r.created = time.mktime(dt.astimezone(None).timetuple()) 4762 f = NoMsecFormatter() 4763 f.converter = time.gmtime 4764 self.assertEqual(f.formatTime(r), '21/04/1993 08:03:00') 4765 4766 def test_issue_89047(self): 4767 f = logging.Formatter(fmt='{asctime}.{msecs:03.0f} {message}', style='{', datefmt="%Y-%m-%d %H:%M:%S") 4768 for i in range(2500): 4769 time.sleep(0.0004) 4770 r = logging.makeLogRecord({'msg': 'Message %d' % (i + 1)}) 4771 s = f.format(r) 4772 self.assertNotIn('.1000', s) 4773 4774 def test_msecs_has_no_floating_point_precision_loss(self): 4775 # See issue gh-102402 4776 tests = ( 4777 # time_ns is approx. 2023-03-04 04:25:20 UTC 4778 # (time_ns, expected_msecs_value) 4779 (1_677_902_297_100_000_000, 100.0), # exactly 100ms 4780 (1_677_903_920_999_998_503, 999.0), # check truncating doesn't round 4781 (1_677_903_920_000_998_503, 0.0), # check truncating doesn't round 4782 (1_677_903_920_999_999_900, 0.0), # check rounding up 4783 ) 4784 for ns, want in tests: 4785 with patch('time.time_ns') as patched_ns: 4786 patched_ns.return_value = ns 4787 record = logging.makeLogRecord({'msg': 'test'}) 4788 with self.subTest(ns): 4789 self.assertEqual(record.msecs, want) 4790 self.assertEqual(record.created, ns / 1e9) 4791 self.assertAlmostEqual(record.created - int(record.created), 4792 record.msecs / 1e3, 4793 delta=1e-3) 4794 4795 def test_relativeCreated_has_higher_precision(self): 4796 # See issue gh-102402. 4797 # Run the code in the subprocess, because the time module should 4798 # be patched before the first import of the logging package. 4799 # Temporary unloading and re-importing the logging package has 4800 # side effects (including registering the atexit callback and 4801 # references leak). 4802 start_ns = 1_677_903_920_000_998_503 # approx. 2023-03-04 04:25:20 UTC 4803 offsets_ns = (200, 500, 12_354, 99_999, 1_677_903_456_999_123_456) 4804 code = textwrap.dedent(f""" 4805 start_ns = {start_ns!r} 4806 offsets_ns = {offsets_ns!r} 4807 start_monotonic_ns = start_ns - 1 4808 4809 import time 4810 # Only time.time_ns needs to be patched for the current 4811 # implementation, but patch also other functions to make 4812 # the test less implementation depending. 4813 old_time_ns = time.time_ns 4814 old_time = time.time 4815 old_monotonic_ns = time.monotonic_ns 4816 old_monotonic = time.monotonic 4817 time_ns_result = start_ns 4818 time.time_ns = lambda: time_ns_result 4819 time.time = lambda: time.time_ns()/1e9 4820 time.monotonic_ns = lambda: time_ns_result - start_monotonic_ns 4821 time.monotonic = lambda: time.monotonic_ns()/1e9 4822 try: 4823 import logging 4824 4825 for offset_ns in offsets_ns: 4826 # mock for log record creation 4827 time_ns_result = start_ns + offset_ns 4828 record = logging.makeLogRecord({{'msg': 'test'}}) 4829 print(record.created, record.relativeCreated) 4830 finally: 4831 time.time_ns = old_time_ns 4832 time.time = old_time 4833 time.monotonic_ns = old_monotonic_ns 4834 time.monotonic = old_monotonic 4835 """) 4836 rc, out, err = assert_python_ok("-c", code) 4837 out = out.decode() 4838 for offset_ns, line in zip(offsets_ns, out.splitlines(), strict=True): 4839 with self.subTest(offset_ns=offset_ns): 4840 created, relativeCreated = map(float, line.split()) 4841 self.assertAlmostEqual(created, (start_ns + offset_ns) / 1e9, places=6) 4842 # After PR gh-102412, precision (places) increases from 3 to 7 4843 self.assertAlmostEqual(relativeCreated, offset_ns / 1e6, places=7) 4844 4845 4846class TestBufferingFormatter(logging.BufferingFormatter): 4847 def formatHeader(self, records): 4848 return '[(%d)' % len(records) 4849 4850 def formatFooter(self, records): 4851 return '(%d)]' % len(records) 4852 4853class BufferingFormatterTest(unittest.TestCase): 4854 def setUp(self): 4855 self.records = [ 4856 logging.makeLogRecord({'msg': 'one'}), 4857 logging.makeLogRecord({'msg': 'two'}), 4858 ] 4859 4860 def test_default(self): 4861 f = logging.BufferingFormatter() 4862 self.assertEqual('', f.format([])) 4863 self.assertEqual('onetwo', f.format(self.records)) 4864 4865 def test_custom(self): 4866 f = TestBufferingFormatter() 4867 self.assertEqual('[(2)onetwo(2)]', f.format(self.records)) 4868 lf = logging.Formatter('<%(message)s>') 4869 f = TestBufferingFormatter(lf) 4870 self.assertEqual('[(2)<one><two>(2)]', f.format(self.records)) 4871 4872class ExceptionTest(BaseTest): 4873 def test_formatting(self): 4874 r = self.root_logger 4875 h = RecordingHandler() 4876 r.addHandler(h) 4877 try: 4878 raise RuntimeError('deliberate mistake') 4879 except: 4880 logging.exception('failed', stack_info=True) 4881 r.removeHandler(h) 4882 h.close() 4883 r = h.records[0] 4884 self.assertTrue(r.exc_text.startswith('Traceback (most recent ' 4885 'call last):\n')) 4886 self.assertTrue(r.exc_text.endswith('\nRuntimeError: ' 4887 'deliberate mistake')) 4888 self.assertTrue(r.stack_info.startswith('Stack (most recent ' 4889 'call last):\n')) 4890 self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', ' 4891 'stack_info=True)')) 4892 4893 4894class LastResortTest(BaseTest): 4895 def test_last_resort(self): 4896 # Test the last resort handler 4897 root = self.root_logger 4898 root.removeHandler(self.root_hdlr) 4899 old_lastresort = logging.lastResort 4900 old_raise_exceptions = logging.raiseExceptions 4901 4902 try: 4903 with support.captured_stderr() as stderr: 4904 root.debug('This should not appear') 4905 self.assertEqual(stderr.getvalue(), '') 4906 root.warning('Final chance!') 4907 self.assertEqual(stderr.getvalue(), 'Final chance!\n') 4908 4909 # No handlers and no last resort, so 'No handlers' message 4910 logging.lastResort = None 4911 with support.captured_stderr() as stderr: 4912 root.warning('Final chance!') 4913 msg = 'No handlers could be found for logger "root"\n' 4914 self.assertEqual(stderr.getvalue(), msg) 4915 4916 # 'No handlers' message only printed once 4917 with support.captured_stderr() as stderr: 4918 root.warning('Final chance!') 4919 self.assertEqual(stderr.getvalue(), '') 4920 4921 # If raiseExceptions is False, no message is printed 4922 root.manager.emittedNoHandlerWarning = False 4923 logging.raiseExceptions = False 4924 with support.captured_stderr() as stderr: 4925 root.warning('Final chance!') 4926 self.assertEqual(stderr.getvalue(), '') 4927 finally: 4928 root.addHandler(self.root_hdlr) 4929 logging.lastResort = old_lastresort 4930 logging.raiseExceptions = old_raise_exceptions 4931 4932 4933class FakeHandler: 4934 4935 def __init__(self, identifier, called): 4936 for method in ('acquire', 'flush', 'close', 'release'): 4937 setattr(self, method, self.record_call(identifier, method, called)) 4938 4939 def record_call(self, identifier, method_name, called): 4940 def inner(): 4941 called.append('{} - {}'.format(identifier, method_name)) 4942 return inner 4943 4944 4945class RecordingHandler(logging.NullHandler): 4946 4947 def __init__(self, *args, **kwargs): 4948 super(RecordingHandler, self).__init__(*args, **kwargs) 4949 self.records = [] 4950 4951 def handle(self, record): 4952 """Keep track of all the emitted records.""" 4953 self.records.append(record) 4954 4955 4956class ShutdownTest(BaseTest): 4957 4958 """Test suite for the shutdown method.""" 4959 4960 def setUp(self): 4961 super(ShutdownTest, self).setUp() 4962 self.called = [] 4963 4964 raise_exceptions = logging.raiseExceptions 4965 self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions) 4966 4967 def raise_error(self, error): 4968 def inner(): 4969 raise error() 4970 return inner 4971 4972 def test_no_failure(self): 4973 # create some fake handlers 4974 handler0 = FakeHandler(0, self.called) 4975 handler1 = FakeHandler(1, self.called) 4976 handler2 = FakeHandler(2, self.called) 4977 4978 # create live weakref to those handlers 4979 handlers = map(logging.weakref.ref, [handler0, handler1, handler2]) 4980 4981 logging.shutdown(handlerList=list(handlers)) 4982 4983 expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release', 4984 '1 - acquire', '1 - flush', '1 - close', '1 - release', 4985 '0 - acquire', '0 - flush', '0 - close', '0 - release'] 4986 self.assertEqual(expected, self.called) 4987 4988 def _test_with_failure_in_method(self, method, error): 4989 handler = FakeHandler(0, self.called) 4990 setattr(handler, method, self.raise_error(error)) 4991 handlers = [logging.weakref.ref(handler)] 4992 4993 logging.shutdown(handlerList=list(handlers)) 4994 4995 self.assertEqual('0 - release', self.called[-1]) 4996 4997 def test_with_ioerror_in_acquire(self): 4998 self._test_with_failure_in_method('acquire', OSError) 4999 5000 def test_with_ioerror_in_flush(self): 5001 self._test_with_failure_in_method('flush', OSError) 5002 5003 def test_with_ioerror_in_close(self): 5004 self._test_with_failure_in_method('close', OSError) 5005 5006 def test_with_valueerror_in_acquire(self): 5007 self._test_with_failure_in_method('acquire', ValueError) 5008 5009 def test_with_valueerror_in_flush(self): 5010 self._test_with_failure_in_method('flush', ValueError) 5011 5012 def test_with_valueerror_in_close(self): 5013 self._test_with_failure_in_method('close', ValueError) 5014 5015 def test_with_other_error_in_acquire_without_raise(self): 5016 logging.raiseExceptions = False 5017 self._test_with_failure_in_method('acquire', IndexError) 5018 5019 def test_with_other_error_in_flush_without_raise(self): 5020 logging.raiseExceptions = False 5021 self._test_with_failure_in_method('flush', IndexError) 5022 5023 def test_with_other_error_in_close_without_raise(self): 5024 logging.raiseExceptions = False 5025 self._test_with_failure_in_method('close', IndexError) 5026 5027 def test_with_other_error_in_acquire_with_raise(self): 5028 logging.raiseExceptions = True 5029 self.assertRaises(IndexError, self._test_with_failure_in_method, 5030 'acquire', IndexError) 5031 5032 def test_with_other_error_in_flush_with_raise(self): 5033 logging.raiseExceptions = True 5034 self.assertRaises(IndexError, self._test_with_failure_in_method, 5035 'flush', IndexError) 5036 5037 def test_with_other_error_in_close_with_raise(self): 5038 logging.raiseExceptions = True 5039 self.assertRaises(IndexError, self._test_with_failure_in_method, 5040 'close', IndexError) 5041 5042 5043class ModuleLevelMiscTest(BaseTest): 5044 5045 """Test suite for some module level methods.""" 5046 5047 def test_disable(self): 5048 old_disable = logging.root.manager.disable 5049 # confirm our assumptions are correct 5050 self.assertEqual(old_disable, 0) 5051 self.addCleanup(logging.disable, old_disable) 5052 5053 logging.disable(83) 5054 self.assertEqual(logging.root.manager.disable, 83) 5055 5056 self.assertRaises(ValueError, logging.disable, "doesnotexists") 5057 5058 class _NotAnIntOrString: 5059 pass 5060 5061 self.assertRaises(TypeError, logging.disable, _NotAnIntOrString()) 5062 5063 logging.disable("WARN") 5064 5065 # test the default value introduced in 3.7 5066 # (Issue #28524) 5067 logging.disable() 5068 self.assertEqual(logging.root.manager.disable, logging.CRITICAL) 5069 5070 def _test_log(self, method, level=None): 5071 called = [] 5072 support.patch(self, logging, 'basicConfig', 5073 lambda *a, **kw: called.append((a, kw))) 5074 5075 recording = RecordingHandler() 5076 logging.root.addHandler(recording) 5077 5078 log_method = getattr(logging, method) 5079 if level is not None: 5080 log_method(level, "test me: %r", recording) 5081 else: 5082 log_method("test me: %r", recording) 5083 5084 self.assertEqual(len(recording.records), 1) 5085 record = recording.records[0] 5086 self.assertEqual(record.getMessage(), "test me: %r" % recording) 5087 5088 expected_level = level if level is not None else getattr(logging, method.upper()) 5089 self.assertEqual(record.levelno, expected_level) 5090 5091 # basicConfig was not called! 5092 self.assertEqual(called, []) 5093 5094 def test_log(self): 5095 self._test_log('log', logging.ERROR) 5096 5097 def test_debug(self): 5098 self._test_log('debug') 5099 5100 def test_info(self): 5101 self._test_log('info') 5102 5103 def test_warning(self): 5104 self._test_log('warning') 5105 5106 def test_error(self): 5107 self._test_log('error') 5108 5109 def test_critical(self): 5110 self._test_log('critical') 5111 5112 def test_set_logger_class(self): 5113 self.assertRaises(TypeError, logging.setLoggerClass, object) 5114 5115 class MyLogger(logging.Logger): 5116 pass 5117 5118 logging.setLoggerClass(MyLogger) 5119 self.assertEqual(logging.getLoggerClass(), MyLogger) 5120 5121 logging.setLoggerClass(logging.Logger) 5122 self.assertEqual(logging.getLoggerClass(), logging.Logger) 5123 5124 def test_subclass_logger_cache(self): 5125 # bpo-37258 5126 message = [] 5127 5128 class MyLogger(logging.getLoggerClass()): 5129 def __init__(self, name='MyLogger', level=logging.NOTSET): 5130 super().__init__(name, level) 5131 message.append('initialized') 5132 5133 logging.setLoggerClass(MyLogger) 5134 logger = logging.getLogger('just_some_logger') 5135 self.assertEqual(message, ['initialized']) 5136 stream = io.StringIO() 5137 h = logging.StreamHandler(stream) 5138 logger.addHandler(h) 5139 try: 5140 logger.setLevel(logging.DEBUG) 5141 logger.debug("hello") 5142 self.assertEqual(stream.getvalue().strip(), "hello") 5143 5144 stream.truncate(0) 5145 stream.seek(0) 5146 5147 logger.setLevel(logging.INFO) 5148 logger.debug("hello") 5149 self.assertEqual(stream.getvalue(), "") 5150 finally: 5151 logger.removeHandler(h) 5152 h.close() 5153 logging.setLoggerClass(logging.Logger) 5154 5155 def test_logging_at_shutdown(self): 5156 # bpo-20037: Doing text I/O late at interpreter shutdown must not crash 5157 code = textwrap.dedent(""" 5158 import logging 5159 5160 class A: 5161 def __del__(self): 5162 try: 5163 raise ValueError("some error") 5164 except Exception: 5165 logging.exception("exception in __del__") 5166 5167 a = A() 5168 """) 5169 rc, out, err = assert_python_ok("-c", code) 5170 err = err.decode() 5171 self.assertIn("exception in __del__", err) 5172 self.assertIn("ValueError: some error", err) 5173 5174 def test_logging_at_shutdown_open(self): 5175 # bpo-26789: FileHandler keeps a reference to the builtin open() 5176 # function to be able to open or reopen the file during Python 5177 # finalization. 5178 filename = os_helper.TESTFN 5179 self.addCleanup(os_helper.unlink, filename) 5180 5181 code = textwrap.dedent(f""" 5182 import builtins 5183 import logging 5184 5185 class A: 5186 def __del__(self): 5187 logging.error("log in __del__") 5188 5189 # basicConfig() opens the file, but logging.shutdown() closes 5190 # it at Python exit. When A.__del__() is called, 5191 # FileHandler._open() must be called again to re-open the file. 5192 logging.basicConfig(filename={filename!r}, encoding="utf-8") 5193 5194 a = A() 5195 5196 # Simulate the Python finalization which removes the builtin 5197 # open() function. 5198 del builtins.open 5199 """) 5200 assert_python_ok("-c", code) 5201 5202 with open(filename, encoding="utf-8") as fp: 5203 self.assertEqual(fp.read().rstrip(), "ERROR:root:log in __del__") 5204 5205 def test_recursion_error(self): 5206 # Issue 36272 5207 code = textwrap.dedent(""" 5208 import logging 5209 5210 def rec(): 5211 logging.error("foo") 5212 rec() 5213 5214 rec() 5215 """) 5216 rc, out, err = assert_python_failure("-c", code) 5217 err = err.decode() 5218 self.assertNotIn("Cannot recover from stack overflow.", err) 5219 self.assertEqual(rc, 1) 5220 5221 def test_get_level_names_mapping(self): 5222 mapping = logging.getLevelNamesMapping() 5223 self.assertEqual(logging._nameToLevel, mapping) # value is equivalent 5224 self.assertIsNot(logging._nameToLevel, mapping) # but not the internal data 5225 new_mapping = logging.getLevelNamesMapping() # another call -> another copy 5226 self.assertIsNot(mapping, new_mapping) # verify not the same object as before 5227 self.assertEqual(mapping, new_mapping) # but equivalent in value 5228 5229 5230class LogRecordTest(BaseTest): 5231 def test_str_rep(self): 5232 r = logging.makeLogRecord({}) 5233 s = str(r) 5234 self.assertTrue(s.startswith('<LogRecord: ')) 5235 self.assertTrue(s.endswith('>')) 5236 5237 def test_dict_arg(self): 5238 h = RecordingHandler() 5239 r = logging.getLogger() 5240 r.addHandler(h) 5241 d = {'less' : 'more' } 5242 logging.warning('less is %(less)s', d) 5243 self.assertIs(h.records[0].args, d) 5244 self.assertEqual(h.records[0].message, 'less is more') 5245 r.removeHandler(h) 5246 h.close() 5247 5248 @staticmethod # pickled as target of child process in the following test 5249 def _extract_logrecord_process_name(key, logMultiprocessing, conn=None): 5250 prev_logMultiprocessing = logging.logMultiprocessing 5251 logging.logMultiprocessing = logMultiprocessing 5252 try: 5253 import multiprocessing as mp 5254 name = mp.current_process().name 5255 5256 r1 = logging.makeLogRecord({'msg': f'msg1_{key}'}) 5257 5258 # https://bugs.python.org/issue45128 5259 with support.swap_item(sys.modules, 'multiprocessing', None): 5260 r2 = logging.makeLogRecord({'msg': f'msg2_{key}'}) 5261 5262 results = {'processName' : name, 5263 'r1.processName': r1.processName, 5264 'r2.processName': r2.processName, 5265 } 5266 finally: 5267 logging.logMultiprocessing = prev_logMultiprocessing 5268 if conn: 5269 conn.send(results) 5270 else: 5271 return results 5272 5273 @skip_if_tsan_fork 5274 def test_multiprocessing(self): 5275 support.skip_if_broken_multiprocessing_synchronize() 5276 multiprocessing_imported = 'multiprocessing' in sys.modules 5277 try: 5278 # logMultiprocessing is True by default 5279 self.assertEqual(logging.logMultiprocessing, True) 5280 5281 LOG_MULTI_PROCESSING = True 5282 # When logMultiprocessing == True: 5283 # In the main process processName = 'MainProcess' 5284 r = logging.makeLogRecord({}) 5285 self.assertEqual(r.processName, 'MainProcess') 5286 5287 results = self._extract_logrecord_process_name(1, LOG_MULTI_PROCESSING) 5288 self.assertEqual('MainProcess', results['processName']) 5289 self.assertEqual('MainProcess', results['r1.processName']) 5290 self.assertEqual('MainProcess', results['r2.processName']) 5291 5292 # In other processes, processName is correct when multiprocessing in imported, 5293 # but it is (incorrectly) defaulted to 'MainProcess' otherwise (bpo-38762). 5294 import multiprocessing 5295 parent_conn, child_conn = multiprocessing.Pipe() 5296 p = multiprocessing.Process( 5297 target=self._extract_logrecord_process_name, 5298 args=(2, LOG_MULTI_PROCESSING, child_conn,) 5299 ) 5300 p.start() 5301 results = parent_conn.recv() 5302 self.assertNotEqual('MainProcess', results['processName']) 5303 self.assertEqual(results['processName'], results['r1.processName']) 5304 self.assertEqual('MainProcess', results['r2.processName']) 5305 p.join() 5306 5307 finally: 5308 if multiprocessing_imported: 5309 import multiprocessing 5310 5311 def test_optional(self): 5312 NONE = self.assertIsNone 5313 NOT_NONE = self.assertIsNotNone 5314 5315 r = logging.makeLogRecord({}) 5316 NOT_NONE(r.thread) 5317 NOT_NONE(r.threadName) 5318 NOT_NONE(r.process) 5319 NOT_NONE(r.processName) 5320 NONE(r.taskName) 5321 log_threads = logging.logThreads 5322 log_processes = logging.logProcesses 5323 log_multiprocessing = logging.logMultiprocessing 5324 log_asyncio_tasks = logging.logAsyncioTasks 5325 try: 5326 logging.logThreads = False 5327 logging.logProcesses = False 5328 logging.logMultiprocessing = False 5329 logging.logAsyncioTasks = False 5330 r = logging.makeLogRecord({}) 5331 5332 NONE(r.thread) 5333 NONE(r.threadName) 5334 NONE(r.process) 5335 NONE(r.processName) 5336 NONE(r.taskName) 5337 finally: 5338 logging.logThreads = log_threads 5339 logging.logProcesses = log_processes 5340 logging.logMultiprocessing = log_multiprocessing 5341 logging.logAsyncioTasks = log_asyncio_tasks 5342 5343 async def _make_record_async(self, assertion): 5344 r = logging.makeLogRecord({}) 5345 assertion(r.taskName) 5346 5347 @support.requires_working_socket() 5348 def test_taskName_with_asyncio_imported(self): 5349 try: 5350 make_record = self._make_record_async 5351 with asyncio.Runner() as runner: 5352 logging.logAsyncioTasks = True 5353 runner.run(make_record(self.assertIsNotNone)) 5354 logging.logAsyncioTasks = False 5355 runner.run(make_record(self.assertIsNone)) 5356 finally: 5357 asyncio.set_event_loop_policy(None) 5358 5359 @support.requires_working_socket() 5360 def test_taskName_without_asyncio_imported(self): 5361 try: 5362 make_record = self._make_record_async 5363 with asyncio.Runner() as runner, support.swap_item(sys.modules, 'asyncio', None): 5364 logging.logAsyncioTasks = True 5365 runner.run(make_record(self.assertIsNone)) 5366 logging.logAsyncioTasks = False 5367 runner.run(make_record(self.assertIsNone)) 5368 finally: 5369 asyncio.set_event_loop_policy(None) 5370 5371 5372class BasicConfigTest(unittest.TestCase): 5373 5374 """Test suite for logging.basicConfig.""" 5375 5376 def setUp(self): 5377 super(BasicConfigTest, self).setUp() 5378 self.handlers = logging.root.handlers 5379 self.saved_handlers = logging._handlers.copy() 5380 self.saved_handler_list = logging._handlerList[:] 5381 self.original_logging_level = logging.root.level 5382 self.addCleanup(self.cleanup) 5383 logging.root.handlers = [] 5384 5385 def tearDown(self): 5386 for h in logging.root.handlers[:]: 5387 logging.root.removeHandler(h) 5388 h.close() 5389 super(BasicConfigTest, self).tearDown() 5390 5391 def cleanup(self): 5392 setattr(logging.root, 'handlers', self.handlers) 5393 logging._handlers.clear() 5394 logging._handlers.update(self.saved_handlers) 5395 logging._handlerList[:] = self.saved_handler_list 5396 logging.root.setLevel(self.original_logging_level) 5397 5398 def test_no_kwargs(self): 5399 logging.basicConfig() 5400 5401 # handler defaults to a StreamHandler to sys.stderr 5402 self.assertEqual(len(logging.root.handlers), 1) 5403 handler = logging.root.handlers[0] 5404 self.assertIsInstance(handler, logging.StreamHandler) 5405 self.assertEqual(handler.stream, sys.stderr) 5406 5407 formatter = handler.formatter 5408 # format defaults to logging.BASIC_FORMAT 5409 self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT) 5410 # datefmt defaults to None 5411 self.assertIsNone(formatter.datefmt) 5412 # style defaults to % 5413 self.assertIsInstance(formatter._style, logging.PercentStyle) 5414 5415 # level is not explicitly set 5416 self.assertEqual(logging.root.level, self.original_logging_level) 5417 5418 def test_strformatstyle(self): 5419 with support.captured_stdout() as output: 5420 logging.basicConfig(stream=sys.stdout, style="{") 5421 logging.error("Log an error") 5422 sys.stdout.seek(0) 5423 self.assertEqual(output.getvalue().strip(), 5424 "ERROR:root:Log an error") 5425 5426 def test_stringtemplatestyle(self): 5427 with support.captured_stdout() as output: 5428 logging.basicConfig(stream=sys.stdout, style="$") 5429 logging.error("Log an error") 5430 sys.stdout.seek(0) 5431 self.assertEqual(output.getvalue().strip(), 5432 "ERROR:root:Log an error") 5433 5434 def test_filename(self): 5435 5436 def cleanup(h1, h2, fn): 5437 h1.close() 5438 h2.close() 5439 os.remove(fn) 5440 5441 logging.basicConfig(filename='test.log', encoding='utf-8') 5442 5443 self.assertEqual(len(logging.root.handlers), 1) 5444 handler = logging.root.handlers[0] 5445 self.assertIsInstance(handler, logging.FileHandler) 5446 5447 expected = logging.FileHandler('test.log', 'a', encoding='utf-8') 5448 self.assertEqual(handler.stream.mode, expected.stream.mode) 5449 self.assertEqual(handler.stream.name, expected.stream.name) 5450 self.addCleanup(cleanup, handler, expected, 'test.log') 5451 5452 def test_filemode(self): 5453 5454 def cleanup(h1, h2, fn): 5455 h1.close() 5456 h2.close() 5457 os.remove(fn) 5458 5459 logging.basicConfig(filename='test.log', filemode='wb') 5460 5461 handler = logging.root.handlers[0] 5462 expected = logging.FileHandler('test.log', 'wb') 5463 self.assertEqual(handler.stream.mode, expected.stream.mode) 5464 self.addCleanup(cleanup, handler, expected, 'test.log') 5465 5466 def test_stream(self): 5467 stream = io.StringIO() 5468 self.addCleanup(stream.close) 5469 logging.basicConfig(stream=stream) 5470 5471 self.assertEqual(len(logging.root.handlers), 1) 5472 handler = logging.root.handlers[0] 5473 self.assertIsInstance(handler, logging.StreamHandler) 5474 self.assertEqual(handler.stream, stream) 5475 5476 def test_format(self): 5477 logging.basicConfig(format='%(asctime)s - %(message)s') 5478 5479 formatter = logging.root.handlers[0].formatter 5480 self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s') 5481 5482 def test_datefmt(self): 5483 logging.basicConfig(datefmt='bar') 5484 5485 formatter = logging.root.handlers[0].formatter 5486 self.assertEqual(formatter.datefmt, 'bar') 5487 5488 def test_style(self): 5489 logging.basicConfig(style='$') 5490 5491 formatter = logging.root.handlers[0].formatter 5492 self.assertIsInstance(formatter._style, logging.StringTemplateStyle) 5493 5494 def test_level(self): 5495 old_level = logging.root.level 5496 self.addCleanup(logging.root.setLevel, old_level) 5497 5498 logging.basicConfig(level=57) 5499 self.assertEqual(logging.root.level, 57) 5500 # Test that second call has no effect 5501 logging.basicConfig(level=58) 5502 self.assertEqual(logging.root.level, 57) 5503 5504 def test_incompatible(self): 5505 assertRaises = self.assertRaises 5506 handlers = [logging.StreamHandler()] 5507 stream = sys.stderr 5508 assertRaises(ValueError, logging.basicConfig, filename='test.log', 5509 stream=stream) 5510 assertRaises(ValueError, logging.basicConfig, filename='test.log', 5511 handlers=handlers) 5512 assertRaises(ValueError, logging.basicConfig, stream=stream, 5513 handlers=handlers) 5514 # Issue 23207: test for invalid kwargs 5515 assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) 5516 # Should pop both filename and filemode even if filename is None 5517 logging.basicConfig(filename=None, filemode='a') 5518 5519 def test_handlers(self): 5520 handlers = [ 5521 logging.StreamHandler(), 5522 logging.StreamHandler(sys.stdout), 5523 logging.StreamHandler(), 5524 ] 5525 f = logging.Formatter() 5526 handlers[2].setFormatter(f) 5527 logging.basicConfig(handlers=handlers) 5528 self.assertIs(handlers[0], logging.root.handlers[0]) 5529 self.assertIs(handlers[1], logging.root.handlers[1]) 5530 self.assertIs(handlers[2], logging.root.handlers[2]) 5531 self.assertIsNotNone(handlers[0].formatter) 5532 self.assertIsNotNone(handlers[1].formatter) 5533 self.assertIs(handlers[2].formatter, f) 5534 self.assertIs(handlers[0].formatter, handlers[1].formatter) 5535 5536 def test_force(self): 5537 old_string_io = io.StringIO() 5538 new_string_io = io.StringIO() 5539 old_handlers = [logging.StreamHandler(old_string_io)] 5540 new_handlers = [logging.StreamHandler(new_string_io)] 5541 logging.basicConfig(level=logging.WARNING, handlers=old_handlers) 5542 logging.warning('warn') 5543 logging.info('info') 5544 logging.debug('debug') 5545 self.assertEqual(len(logging.root.handlers), 1) 5546 logging.basicConfig(level=logging.INFO, handlers=new_handlers, 5547 force=True) 5548 logging.warning('warn') 5549 logging.info('info') 5550 logging.debug('debug') 5551 self.assertEqual(len(logging.root.handlers), 1) 5552 self.assertEqual(old_string_io.getvalue().strip(), 5553 'WARNING:root:warn') 5554 self.assertEqual(new_string_io.getvalue().strip(), 5555 'WARNING:root:warn\nINFO:root:info') 5556 5557 def test_encoding(self): 5558 try: 5559 encoding = 'utf-8' 5560 logging.basicConfig(filename='test.log', encoding=encoding, 5561 errors='strict', 5562 format='%(message)s', level=logging.DEBUG) 5563 5564 self.assertEqual(len(logging.root.handlers), 1) 5565 handler = logging.root.handlers[0] 5566 self.assertIsInstance(handler, logging.FileHandler) 5567 self.assertEqual(handler.encoding, encoding) 5568 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 5569 finally: 5570 handler.close() 5571 with open('test.log', encoding='utf-8') as f: 5572 data = f.read().strip() 5573 os.remove('test.log') 5574 self.assertEqual(data, 5575 'The Øresund Bridge joins Copenhagen to Malmö') 5576 5577 def test_encoding_errors(self): 5578 try: 5579 encoding = 'ascii' 5580 logging.basicConfig(filename='test.log', encoding=encoding, 5581 errors='ignore', 5582 format='%(message)s', level=logging.DEBUG) 5583 5584 self.assertEqual(len(logging.root.handlers), 1) 5585 handler = logging.root.handlers[0] 5586 self.assertIsInstance(handler, logging.FileHandler) 5587 self.assertEqual(handler.encoding, encoding) 5588 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 5589 finally: 5590 handler.close() 5591 with open('test.log', encoding='utf-8') as f: 5592 data = f.read().strip() 5593 os.remove('test.log') 5594 self.assertEqual(data, 'The resund Bridge joins Copenhagen to Malm') 5595 5596 def test_encoding_errors_default(self): 5597 try: 5598 encoding = 'ascii' 5599 logging.basicConfig(filename='test.log', encoding=encoding, 5600 format='%(message)s', level=logging.DEBUG) 5601 5602 self.assertEqual(len(logging.root.handlers), 1) 5603 handler = logging.root.handlers[0] 5604 self.assertIsInstance(handler, logging.FileHandler) 5605 self.assertEqual(handler.encoding, encoding) 5606 self.assertEqual(handler.errors, 'backslashreplace') 5607 logging.debug(': ☃️: The Øresund Bridge joins Copenhagen to Malmö') 5608 finally: 5609 handler.close() 5610 with open('test.log', encoding='utf-8') as f: 5611 data = f.read().strip() 5612 os.remove('test.log') 5613 self.assertEqual(data, r'\U0001f602: \u2603\ufe0f: The \xd8resund ' 5614 r'Bridge joins Copenhagen to Malm\xf6') 5615 5616 def test_encoding_errors_none(self): 5617 # Specifying None should behave as 'strict' 5618 try: 5619 encoding = 'ascii' 5620 logging.basicConfig(filename='test.log', encoding=encoding, 5621 errors=None, 5622 format='%(message)s', level=logging.DEBUG) 5623 5624 self.assertEqual(len(logging.root.handlers), 1) 5625 handler = logging.root.handlers[0] 5626 self.assertIsInstance(handler, logging.FileHandler) 5627 self.assertEqual(handler.encoding, encoding) 5628 self.assertIsNone(handler.errors) 5629 5630 message = [] 5631 5632 def dummy_handle_error(record): 5633 message.append(str(sys.exception())) 5634 5635 handler.handleError = dummy_handle_error 5636 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 5637 self.assertTrue(message) 5638 self.assertIn("'ascii' codec can't encode " 5639 "character '\\xd8' in position 4:", message[0]) 5640 finally: 5641 handler.close() 5642 with open('test.log', encoding='utf-8') as f: 5643 data = f.read().strip() 5644 os.remove('test.log') 5645 # didn't write anything due to the encoding error 5646 self.assertEqual(data, r'') 5647 5648 @support.requires_working_socket() 5649 def test_log_taskName(self): 5650 async def log_record(): 5651 logging.warning('hello world') 5652 5653 handler = None 5654 log_filename = make_temp_file('.log', 'test-logging-taskname-') 5655 self.addCleanup(os.remove, log_filename) 5656 try: 5657 encoding = 'utf-8' 5658 logging.basicConfig(filename=log_filename, errors='strict', 5659 encoding=encoding, level=logging.WARNING, 5660 format='%(taskName)s - %(message)s') 5661 5662 self.assertEqual(len(logging.root.handlers), 1) 5663 handler = logging.root.handlers[0] 5664 self.assertIsInstance(handler, logging.FileHandler) 5665 5666 with asyncio.Runner(debug=True) as runner: 5667 logging.logAsyncioTasks = True 5668 runner.run(log_record()) 5669 with open(log_filename, encoding='utf-8') as f: 5670 data = f.read().strip() 5671 self.assertRegex(data, r'Task-\d+ - hello world') 5672 finally: 5673 asyncio.set_event_loop_policy(None) 5674 if handler: 5675 handler.close() 5676 5677 5678 def _test_log(self, method, level=None): 5679 # logging.root has no handlers so basicConfig should be called 5680 called = [] 5681 5682 old_basic_config = logging.basicConfig 5683 def my_basic_config(*a, **kw): 5684 old_basic_config() 5685 old_level = logging.root.level 5686 logging.root.setLevel(100) # avoid having messages in stderr 5687 self.addCleanup(logging.root.setLevel, old_level) 5688 called.append((a, kw)) 5689 5690 support.patch(self, logging, 'basicConfig', my_basic_config) 5691 5692 log_method = getattr(logging, method) 5693 if level is not None: 5694 log_method(level, "test me") 5695 else: 5696 log_method("test me") 5697 5698 # basicConfig was called with no arguments 5699 self.assertEqual(called, [((), {})]) 5700 5701 def test_log(self): 5702 self._test_log('log', logging.WARNING) 5703 5704 def test_debug(self): 5705 self._test_log('debug') 5706 5707 def test_info(self): 5708 self._test_log('info') 5709 5710 def test_warning(self): 5711 self._test_log('warning') 5712 5713 def test_error(self): 5714 self._test_log('error') 5715 5716 def test_critical(self): 5717 self._test_log('critical') 5718 5719 5720class LoggerAdapterTest(unittest.TestCase): 5721 def setUp(self): 5722 super(LoggerAdapterTest, self).setUp() 5723 old_handler_list = logging._handlerList[:] 5724 5725 self.recording = RecordingHandler() 5726 self.logger = logging.root 5727 self.logger.addHandler(self.recording) 5728 self.addCleanup(self.logger.removeHandler, self.recording) 5729 self.addCleanup(self.recording.close) 5730 5731 def cleanup(): 5732 logging._handlerList[:] = old_handler_list 5733 5734 self.addCleanup(cleanup) 5735 self.addCleanup(logging.shutdown) 5736 self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None) 5737 5738 def test_exception(self): 5739 msg = 'testing exception: %r' 5740 exc = None 5741 try: 5742 1 / 0 5743 except ZeroDivisionError as e: 5744 exc = e 5745 self.adapter.exception(msg, self.recording) 5746 5747 self.assertEqual(len(self.recording.records), 1) 5748 record = self.recording.records[0] 5749 self.assertEqual(record.levelno, logging.ERROR) 5750 self.assertEqual(record.msg, msg) 5751 self.assertEqual(record.args, (self.recording,)) 5752 self.assertEqual(record.exc_info, 5753 (exc.__class__, exc, exc.__traceback__)) 5754 5755 def test_exception_excinfo(self): 5756 try: 5757 1 / 0 5758 except ZeroDivisionError as e: 5759 exc = e 5760 5761 self.adapter.exception('exc_info test', exc_info=exc) 5762 5763 self.assertEqual(len(self.recording.records), 1) 5764 record = self.recording.records[0] 5765 self.assertEqual(record.exc_info, 5766 (exc.__class__, exc, exc.__traceback__)) 5767 5768 def test_critical(self): 5769 msg = 'critical test! %r' 5770 self.adapter.critical(msg, self.recording) 5771 5772 self.assertEqual(len(self.recording.records), 1) 5773 record = self.recording.records[0] 5774 self.assertEqual(record.levelno, logging.CRITICAL) 5775 self.assertEqual(record.msg, msg) 5776 self.assertEqual(record.args, (self.recording,)) 5777 self.assertEqual(record.funcName, 'test_critical') 5778 5779 def test_is_enabled_for(self): 5780 old_disable = self.adapter.logger.manager.disable 5781 self.adapter.logger.manager.disable = 33 5782 self.addCleanup(setattr, self.adapter.logger.manager, 'disable', 5783 old_disable) 5784 self.assertFalse(self.adapter.isEnabledFor(32)) 5785 5786 def test_has_handlers(self): 5787 self.assertTrue(self.adapter.hasHandlers()) 5788 5789 for handler in self.logger.handlers: 5790 self.logger.removeHandler(handler) 5791 5792 self.assertFalse(self.logger.hasHandlers()) 5793 self.assertFalse(self.adapter.hasHandlers()) 5794 5795 def test_nested(self): 5796 msg = 'Adapters can be nested, yo.' 5797 adapter = PrefixAdapter(logger=self.logger, extra=None) 5798 adapter_adapter = PrefixAdapter(logger=adapter, extra=None) 5799 adapter_adapter.prefix = 'AdapterAdapter' 5800 self.assertEqual(repr(adapter), repr(adapter_adapter)) 5801 adapter_adapter.log(logging.CRITICAL, msg, self.recording) 5802 self.assertEqual(len(self.recording.records), 1) 5803 record = self.recording.records[0] 5804 self.assertEqual(record.levelno, logging.CRITICAL) 5805 self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}") 5806 self.assertEqual(record.args, (self.recording,)) 5807 self.assertEqual(record.funcName, 'test_nested') 5808 orig_manager = adapter_adapter.manager 5809 self.assertIs(adapter.manager, orig_manager) 5810 self.assertIs(self.logger.manager, orig_manager) 5811 temp_manager = object() 5812 try: 5813 adapter_adapter.manager = temp_manager 5814 self.assertIs(adapter_adapter.manager, temp_manager) 5815 self.assertIs(adapter.manager, temp_manager) 5816 self.assertIs(self.logger.manager, temp_manager) 5817 finally: 5818 adapter_adapter.manager = orig_manager 5819 self.assertIs(adapter_adapter.manager, orig_manager) 5820 self.assertIs(adapter.manager, orig_manager) 5821 self.assertIs(self.logger.manager, orig_manager) 5822 5823 def test_styled_adapter(self): 5824 # Test an example from the Cookbook. 5825 records = self.recording.records 5826 adapter = StyleAdapter(self.logger) 5827 adapter.warning('Hello, {}!', 'world') 5828 self.assertEqual(str(records[-1].msg), 'Hello, world!') 5829 self.assertEqual(records[-1].funcName, 'test_styled_adapter') 5830 adapter.log(logging.WARNING, 'Goodbye {}.', 'world') 5831 self.assertEqual(str(records[-1].msg), 'Goodbye world.') 5832 self.assertEqual(records[-1].funcName, 'test_styled_adapter') 5833 5834 def test_nested_styled_adapter(self): 5835 records = self.recording.records 5836 adapter = PrefixAdapter(self.logger) 5837 adapter.prefix = '{}' 5838 adapter2 = StyleAdapter(adapter) 5839 adapter2.warning('Hello, {}!', 'world') 5840 self.assertEqual(str(records[-1].msg), '{} Hello, world!') 5841 self.assertEqual(records[-1].funcName, 'test_nested_styled_adapter') 5842 adapter2.log(logging.WARNING, 'Goodbye {}.', 'world') 5843 self.assertEqual(str(records[-1].msg), '{} Goodbye world.') 5844 self.assertEqual(records[-1].funcName, 'test_nested_styled_adapter') 5845 5846 def test_find_caller_with_stacklevel(self): 5847 the_level = 1 5848 trigger = self.adapter.warning 5849 5850 def innermost(): 5851 trigger('test', stacklevel=the_level) 5852 5853 def inner(): 5854 innermost() 5855 5856 def outer(): 5857 inner() 5858 5859 records = self.recording.records 5860 outer() 5861 self.assertEqual(records[-1].funcName, 'innermost') 5862 lineno = records[-1].lineno 5863 the_level += 1 5864 outer() 5865 self.assertEqual(records[-1].funcName, 'inner') 5866 self.assertGreater(records[-1].lineno, lineno) 5867 lineno = records[-1].lineno 5868 the_level += 1 5869 outer() 5870 self.assertEqual(records[-1].funcName, 'outer') 5871 self.assertGreater(records[-1].lineno, lineno) 5872 lineno = records[-1].lineno 5873 the_level += 1 5874 outer() 5875 self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel') 5876 self.assertGreater(records[-1].lineno, lineno) 5877 5878 def test_extra_in_records(self): 5879 self.adapter = logging.LoggerAdapter(logger=self.logger, 5880 extra={'foo': '1'}) 5881 5882 self.adapter.critical('foo should be here') 5883 self.assertEqual(len(self.recording.records), 1) 5884 record = self.recording.records[0] 5885 self.assertTrue(hasattr(record, 'foo')) 5886 self.assertEqual(record.foo, '1') 5887 5888 def test_extra_not_merged_by_default(self): 5889 self.adapter.critical('foo should NOT be here', extra={'foo': 'nope'}) 5890 self.assertEqual(len(self.recording.records), 1) 5891 record = self.recording.records[0] 5892 self.assertFalse(hasattr(record, 'foo')) 5893 5894 def test_extra_merged(self): 5895 self.adapter = logging.LoggerAdapter(logger=self.logger, 5896 extra={'foo': '1'}, 5897 merge_extra=True) 5898 5899 self.adapter.critical('foo and bar should be here', extra={'bar': '2'}) 5900 self.assertEqual(len(self.recording.records), 1) 5901 record = self.recording.records[0] 5902 self.assertTrue(hasattr(record, 'foo')) 5903 self.assertTrue(hasattr(record, 'bar')) 5904 self.assertEqual(record.foo, '1') 5905 self.assertEqual(record.bar, '2') 5906 5907 def test_extra_merged_log_call_has_precedence(self): 5908 self.adapter = logging.LoggerAdapter(logger=self.logger, 5909 extra={'foo': '1'}, 5910 merge_extra=True) 5911 5912 self.adapter.critical('foo shall be min', extra={'foo': '2'}) 5913 self.assertEqual(len(self.recording.records), 1) 5914 record = self.recording.records[0] 5915 self.assertTrue(hasattr(record, 'foo')) 5916 self.assertEqual(record.foo, '2') 5917 5918 5919class PrefixAdapter(logging.LoggerAdapter): 5920 prefix = 'Adapter' 5921 5922 def process(self, msg, kwargs): 5923 return f"{self.prefix} {msg}", kwargs 5924 5925 5926class Message: 5927 def __init__(self, fmt, args): 5928 self.fmt = fmt 5929 self.args = args 5930 5931 def __str__(self): 5932 return self.fmt.format(*self.args) 5933 5934 5935class StyleAdapter(logging.LoggerAdapter): 5936 def log(self, level, msg, /, *args, stacklevel=1, **kwargs): 5937 if self.isEnabledFor(level): 5938 msg, kwargs = self.process(msg, kwargs) 5939 self.logger.log(level, Message(msg, args), **kwargs, 5940 stacklevel=stacklevel+1) 5941 5942 5943class LoggerTest(BaseTest, AssertErrorMessage): 5944 5945 def setUp(self): 5946 super(LoggerTest, self).setUp() 5947 self.recording = RecordingHandler() 5948 self.logger = logging.Logger(name='blah') 5949 self.logger.addHandler(self.recording) 5950 self.addCleanup(self.logger.removeHandler, self.recording) 5951 self.addCleanup(self.recording.close) 5952 self.addCleanup(logging.shutdown) 5953 5954 def test_set_invalid_level(self): 5955 self.assert_error_message( 5956 TypeError, 'Level not an integer or a valid string: None', 5957 self.logger.setLevel, None) 5958 self.assert_error_message( 5959 TypeError, 'Level not an integer or a valid string: (0, 0)', 5960 self.logger.setLevel, (0, 0)) 5961 5962 def test_exception(self): 5963 msg = 'testing exception: %r' 5964 exc = None 5965 try: 5966 1 / 0 5967 except ZeroDivisionError as e: 5968 exc = e 5969 self.logger.exception(msg, self.recording) 5970 5971 self.assertEqual(len(self.recording.records), 1) 5972 record = self.recording.records[0] 5973 self.assertEqual(record.levelno, logging.ERROR) 5974 self.assertEqual(record.msg, msg) 5975 self.assertEqual(record.args, (self.recording,)) 5976 self.assertEqual(record.exc_info, 5977 (exc.__class__, exc, exc.__traceback__)) 5978 5979 def test_log_invalid_level_with_raise(self): 5980 with support.swap_attr(logging, 'raiseExceptions', True): 5981 self.assertRaises(TypeError, self.logger.log, '10', 'test message') 5982 5983 def test_log_invalid_level_no_raise(self): 5984 with support.swap_attr(logging, 'raiseExceptions', False): 5985 self.logger.log('10', 'test message') # no exception happens 5986 5987 def test_find_caller_with_stack_info(self): 5988 called = [] 5989 support.patch(self, logging.traceback, 'print_stack', 5990 lambda f, file: called.append(file.getvalue())) 5991 5992 self.logger.findCaller(stack_info=True) 5993 5994 self.assertEqual(len(called), 1) 5995 self.assertEqual('Stack (most recent call last):\n', called[0]) 5996 5997 def test_find_caller_with_stacklevel(self): 5998 the_level = 1 5999 trigger = self.logger.warning 6000 6001 def innermost(): 6002 trigger('test', stacklevel=the_level) 6003 6004 def inner(): 6005 innermost() 6006 6007 def outer(): 6008 inner() 6009 6010 records = self.recording.records 6011 outer() 6012 self.assertEqual(records[-1].funcName, 'innermost') 6013 lineno = records[-1].lineno 6014 the_level += 1 6015 outer() 6016 self.assertEqual(records[-1].funcName, 'inner') 6017 self.assertGreater(records[-1].lineno, lineno) 6018 lineno = records[-1].lineno 6019 the_level += 1 6020 outer() 6021 self.assertEqual(records[-1].funcName, 'outer') 6022 self.assertGreater(records[-1].lineno, lineno) 6023 lineno = records[-1].lineno 6024 root_logger = logging.getLogger() 6025 root_logger.addHandler(self.recording) 6026 trigger = logging.warning 6027 outer() 6028 self.assertEqual(records[-1].funcName, 'outer') 6029 root_logger.removeHandler(self.recording) 6030 trigger = self.logger.warning 6031 the_level += 1 6032 outer() 6033 self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel') 6034 self.assertGreater(records[-1].lineno, lineno) 6035 6036 def test_make_record_with_extra_overwrite(self): 6037 name = 'my record' 6038 level = 13 6039 fn = lno = msg = args = exc_info = func = sinfo = None 6040 rv = logging._logRecordFactory(name, level, fn, lno, msg, args, 6041 exc_info, func, sinfo) 6042 6043 for key in ('message', 'asctime') + tuple(rv.__dict__.keys()): 6044 extra = {key: 'some value'} 6045 self.assertRaises(KeyError, self.logger.makeRecord, name, level, 6046 fn, lno, msg, args, exc_info, 6047 extra=extra, sinfo=sinfo) 6048 6049 def test_make_record_with_extra_no_overwrite(self): 6050 name = 'my record' 6051 level = 13 6052 fn = lno = msg = args = exc_info = func = sinfo = None 6053 extra = {'valid_key': 'some value'} 6054 result = self.logger.makeRecord(name, level, fn, lno, msg, args, 6055 exc_info, extra=extra, sinfo=sinfo) 6056 self.assertIn('valid_key', result.__dict__) 6057 6058 def test_has_handlers(self): 6059 self.assertTrue(self.logger.hasHandlers()) 6060 6061 for handler in self.logger.handlers: 6062 self.logger.removeHandler(handler) 6063 self.assertFalse(self.logger.hasHandlers()) 6064 6065 def test_has_handlers_no_propagate(self): 6066 child_logger = logging.getLogger('blah.child') 6067 child_logger.propagate = False 6068 self.assertFalse(child_logger.hasHandlers()) 6069 6070 def test_is_enabled_for(self): 6071 old_disable = self.logger.manager.disable 6072 self.logger.manager.disable = 23 6073 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 6074 self.assertFalse(self.logger.isEnabledFor(22)) 6075 6076 def test_is_enabled_for_disabled_logger(self): 6077 old_disabled = self.logger.disabled 6078 old_disable = self.logger.manager.disable 6079 6080 self.logger.disabled = True 6081 self.logger.manager.disable = 21 6082 6083 self.addCleanup(setattr, self.logger, 'disabled', old_disabled) 6084 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 6085 6086 self.assertFalse(self.logger.isEnabledFor(22)) 6087 6088 def test_root_logger_aliases(self): 6089 root = logging.getLogger() 6090 self.assertIs(root, logging.root) 6091 self.assertIs(root, logging.getLogger(None)) 6092 self.assertIs(root, logging.getLogger('')) 6093 self.assertIs(root, logging.getLogger('root')) 6094 self.assertIs(root, logging.getLogger('foo').root) 6095 self.assertIs(root, logging.getLogger('foo.bar').root) 6096 self.assertIs(root, logging.getLogger('foo').parent) 6097 6098 self.assertIsNot(root, logging.getLogger('\0')) 6099 self.assertIsNot(root, logging.getLogger('foo.bar').parent) 6100 6101 def test_invalid_names(self): 6102 self.assertRaises(TypeError, logging.getLogger, any) 6103 self.assertRaises(TypeError, logging.getLogger, b'foo') 6104 6105 def test_pickling(self): 6106 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 6107 for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'): 6108 logger = logging.getLogger(name) 6109 s = pickle.dumps(logger, proto) 6110 unpickled = pickle.loads(s) 6111 self.assertIs(unpickled, logger) 6112 6113 def test_caching(self): 6114 root = self.root_logger 6115 logger1 = logging.getLogger("abc") 6116 logger2 = logging.getLogger("abc.def") 6117 6118 # Set root logger level and ensure cache is empty 6119 root.setLevel(logging.ERROR) 6120 self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR) 6121 self.assertEqual(logger2._cache, {}) 6122 6123 # Ensure cache is populated and calls are consistent 6124 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 6125 self.assertFalse(logger2.isEnabledFor(logging.DEBUG)) 6126 self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False}) 6127 self.assertEqual(root._cache, {}) 6128 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 6129 6130 # Ensure root cache gets populated 6131 self.assertEqual(root._cache, {}) 6132 self.assertTrue(root.isEnabledFor(logging.ERROR)) 6133 self.assertEqual(root._cache, {logging.ERROR: True}) 6134 6135 # Set parent logger level and ensure caches are emptied 6136 logger1.setLevel(logging.CRITICAL) 6137 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 6138 self.assertEqual(logger2._cache, {}) 6139 6140 # Ensure logger2 uses parent logger's effective level 6141 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 6142 6143 # Set level to NOTSET and ensure caches are empty 6144 logger2.setLevel(logging.NOTSET) 6145 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 6146 self.assertEqual(logger2._cache, {}) 6147 self.assertEqual(logger1._cache, {}) 6148 self.assertEqual(root._cache, {}) 6149 6150 # Verify logger2 follows parent and not root 6151 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 6152 self.assertTrue(logger2.isEnabledFor(logging.CRITICAL)) 6153 self.assertFalse(logger1.isEnabledFor(logging.ERROR)) 6154 self.assertTrue(logger1.isEnabledFor(logging.CRITICAL)) 6155 self.assertTrue(root.isEnabledFor(logging.ERROR)) 6156 6157 # Disable logging in manager and ensure caches are clear 6158 logging.disable() 6159 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 6160 self.assertEqual(logger2._cache, {}) 6161 self.assertEqual(logger1._cache, {}) 6162 self.assertEqual(root._cache, {}) 6163 6164 # Ensure no loggers are enabled 6165 self.assertFalse(logger1.isEnabledFor(logging.CRITICAL)) 6166 self.assertFalse(logger2.isEnabledFor(logging.CRITICAL)) 6167 self.assertFalse(root.isEnabledFor(logging.CRITICAL)) 6168 6169 6170class BaseFileTest(BaseTest): 6171 "Base class for handler tests that write log files" 6172 6173 def setUp(self): 6174 BaseTest.setUp(self) 6175 self.fn = make_temp_file(".log", "test_logging-2-") 6176 self.rmfiles = [] 6177 6178 def tearDown(self): 6179 for fn in self.rmfiles: 6180 os.unlink(fn) 6181 if os.path.exists(self.fn): 6182 os.unlink(self.fn) 6183 BaseTest.tearDown(self) 6184 6185 def assertLogFile(self, filename): 6186 "Assert a log file is there and register it for deletion" 6187 self.assertTrue(os.path.exists(filename), 6188 msg="Log file %r does not exist" % filename) 6189 self.rmfiles.append(filename) 6190 6191 def next_rec(self): 6192 return logging.LogRecord('n', logging.DEBUG, 'p', 1, 6193 self.next_message(), None, None, None) 6194 6195class FileHandlerTest(BaseFileTest): 6196 def test_delay(self): 6197 os.unlink(self.fn) 6198 fh = logging.FileHandler(self.fn, encoding='utf-8', delay=True) 6199 self.assertIsNone(fh.stream) 6200 self.assertFalse(os.path.exists(self.fn)) 6201 fh.handle(logging.makeLogRecord({})) 6202 self.assertIsNotNone(fh.stream) 6203 self.assertTrue(os.path.exists(self.fn)) 6204 fh.close() 6205 6206 def test_emit_after_closing_in_write_mode(self): 6207 # Issue #42378 6208 os.unlink(self.fn) 6209 fh = logging.FileHandler(self.fn, encoding='utf-8', mode='w') 6210 fh.setFormatter(logging.Formatter('%(message)s')) 6211 fh.emit(self.next_rec()) # '1' 6212 fh.close() 6213 fh.emit(self.next_rec()) # '2' 6214 with open(self.fn) as fp: 6215 self.assertEqual(fp.read().strip(), '1') 6216 6217class RotatingFileHandlerTest(BaseFileTest): 6218 def test_should_not_rollover(self): 6219 # If file is empty rollover never occurs 6220 rh = logging.handlers.RotatingFileHandler( 6221 self.fn, encoding="utf-8", maxBytes=1) 6222 self.assertFalse(rh.shouldRollover(None)) 6223 rh.close() 6224 6225 # If maxBytes is zero rollover never occurs 6226 rh = logging.handlers.RotatingFileHandler( 6227 self.fn, encoding="utf-8", maxBytes=0) 6228 self.assertFalse(rh.shouldRollover(None)) 6229 rh.close() 6230 6231 with open(self.fn, 'wb') as f: 6232 f.write(b'\n') 6233 rh = logging.handlers.RotatingFileHandler( 6234 self.fn, encoding="utf-8", maxBytes=0) 6235 self.assertFalse(rh.shouldRollover(None)) 6236 rh.close() 6237 6238 @unittest.skipIf(support.is_wasi, "WASI does not have /dev/null.") 6239 def test_should_not_rollover_non_file(self): 6240 # bpo-45401 - test with special file 6241 # We set maxBytes to 1 so that rollover would normally happen, except 6242 # for the check for regular files 6243 rh = logging.handlers.RotatingFileHandler( 6244 os.devnull, encoding="utf-8", maxBytes=1) 6245 self.assertFalse(rh.shouldRollover(self.next_rec())) 6246 rh.close() 6247 6248 def test_should_rollover(self): 6249 with open(self.fn, 'wb') as f: 6250 f.write(b'\n') 6251 rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8", maxBytes=2) 6252 self.assertTrue(rh.shouldRollover(self.next_rec())) 6253 rh.close() 6254 6255 def test_file_created(self): 6256 # checks that the file is created and assumes it was created 6257 # by us 6258 os.unlink(self.fn) 6259 rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8") 6260 rh.emit(self.next_rec()) 6261 self.assertLogFile(self.fn) 6262 rh.close() 6263 6264 def test_max_bytes(self, delay=False): 6265 kwargs = {'delay': delay} if delay else {} 6266 os.unlink(self.fn) 6267 rh = logging.handlers.RotatingFileHandler( 6268 self.fn, encoding="utf-8", backupCount=2, maxBytes=100, **kwargs) 6269 self.assertIs(os.path.exists(self.fn), not delay) 6270 small = logging.makeLogRecord({'msg': 'a'}) 6271 large = logging.makeLogRecord({'msg': 'b'*100}) 6272 self.assertFalse(rh.shouldRollover(small)) 6273 self.assertFalse(rh.shouldRollover(large)) 6274 rh.emit(small) 6275 self.assertLogFile(self.fn) 6276 self.assertFalse(os.path.exists(self.fn + ".1")) 6277 self.assertFalse(rh.shouldRollover(small)) 6278 self.assertTrue(rh.shouldRollover(large)) 6279 rh.emit(large) 6280 self.assertTrue(os.path.exists(self.fn)) 6281 self.assertLogFile(self.fn + ".1") 6282 self.assertFalse(os.path.exists(self.fn + ".2")) 6283 self.assertTrue(rh.shouldRollover(small)) 6284 self.assertTrue(rh.shouldRollover(large)) 6285 rh.close() 6286 6287 def test_max_bytes_delay(self): 6288 self.test_max_bytes(delay=True) 6289 6290 def test_rollover_filenames(self): 6291 def namer(name): 6292 return name + ".test" 6293 rh = logging.handlers.RotatingFileHandler( 6294 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 6295 rh.namer = namer 6296 rh.emit(self.next_rec()) 6297 self.assertLogFile(self.fn) 6298 self.assertFalse(os.path.exists(namer(self.fn + ".1"))) 6299 rh.emit(self.next_rec()) 6300 self.assertLogFile(namer(self.fn + ".1")) 6301 self.assertFalse(os.path.exists(namer(self.fn + ".2"))) 6302 rh.emit(self.next_rec()) 6303 self.assertLogFile(namer(self.fn + ".2")) 6304 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 6305 rh.emit(self.next_rec()) 6306 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 6307 rh.close() 6308 6309 def test_namer_rotator_inheritance(self): 6310 class HandlerWithNamerAndRotator(logging.handlers.RotatingFileHandler): 6311 def namer(self, name): 6312 return name + ".test" 6313 6314 def rotator(self, source, dest): 6315 if os.path.exists(source): 6316 os.replace(source, dest + ".rotated") 6317 6318 rh = HandlerWithNamerAndRotator( 6319 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 6320 self.assertEqual(rh.namer(self.fn), self.fn + ".test") 6321 rh.emit(self.next_rec()) 6322 self.assertLogFile(self.fn) 6323 rh.emit(self.next_rec()) 6324 self.assertLogFile(rh.namer(self.fn + ".1") + ".rotated") 6325 self.assertFalse(os.path.exists(rh.namer(self.fn + ".1"))) 6326 rh.close() 6327 6328 @support.requires_zlib() 6329 def test_rotator(self): 6330 def namer(name): 6331 return name + ".gz" 6332 6333 def rotator(source, dest): 6334 with open(source, "rb") as sf: 6335 data = sf.read() 6336 compressed = zlib.compress(data, 9) 6337 with open(dest, "wb") as df: 6338 df.write(compressed) 6339 os.remove(source) 6340 6341 rh = logging.handlers.RotatingFileHandler( 6342 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 6343 rh.rotator = rotator 6344 rh.namer = namer 6345 m1 = self.next_rec() 6346 rh.emit(m1) 6347 self.assertLogFile(self.fn) 6348 m2 = self.next_rec() 6349 rh.emit(m2) 6350 fn = namer(self.fn + ".1") 6351 self.assertLogFile(fn) 6352 newline = os.linesep 6353 with open(fn, "rb") as f: 6354 compressed = f.read() 6355 data = zlib.decompress(compressed) 6356 self.assertEqual(data.decode("ascii"), m1.msg + newline) 6357 rh.emit(self.next_rec()) 6358 fn = namer(self.fn + ".2") 6359 self.assertLogFile(fn) 6360 with open(fn, "rb") as f: 6361 compressed = f.read() 6362 data = zlib.decompress(compressed) 6363 self.assertEqual(data.decode("ascii"), m1.msg + newline) 6364 rh.emit(self.next_rec()) 6365 fn = namer(self.fn + ".2") 6366 with open(fn, "rb") as f: 6367 compressed = f.read() 6368 data = zlib.decompress(compressed) 6369 self.assertEqual(data.decode("ascii"), m2.msg + newline) 6370 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 6371 rh.close() 6372 6373class TimedRotatingFileHandlerTest(BaseFileTest): 6374 @unittest.skipIf(support.is_wasi, "WASI does not have /dev/null.") 6375 def test_should_not_rollover(self): 6376 # See bpo-45401. Should only ever rollover regular files 6377 fh = logging.handlers.TimedRotatingFileHandler( 6378 os.devnull, 'S', encoding="utf-8", backupCount=1) 6379 time.sleep(1.1) # a little over a second ... 6380 r = logging.makeLogRecord({'msg': 'testing - device file'}) 6381 self.assertFalse(fh.shouldRollover(r)) 6382 fh.close() 6383 6384 # other test methods added below 6385 def test_rollover(self): 6386 fh = logging.handlers.TimedRotatingFileHandler( 6387 self.fn, 'S', encoding="utf-8", backupCount=1) 6388 fmt = logging.Formatter('%(asctime)s %(message)s') 6389 fh.setFormatter(fmt) 6390 r1 = logging.makeLogRecord({'msg': 'testing - initial'}) 6391 fh.emit(r1) 6392 self.assertLogFile(self.fn) 6393 time.sleep(1.1) # a little over a second ... 6394 r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) 6395 fh.emit(r2) 6396 fh.close() 6397 # At this point, we should have a recent rotated file which we 6398 # can test for the existence of. However, in practice, on some 6399 # machines which run really slowly, we don't know how far back 6400 # in time to go to look for the log file. So, we go back a fair 6401 # bit, and stop as soon as we see a rotated file. In theory this 6402 # could of course still fail, but the chances are lower. 6403 found = False 6404 now = datetime.datetime.now() 6405 GO_BACK = 5 * 60 # seconds 6406 for secs in range(GO_BACK): 6407 prev = now - datetime.timedelta(seconds=secs) 6408 fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") 6409 found = os.path.exists(fn) 6410 if found: 6411 self.rmfiles.append(fn) 6412 break 6413 msg = 'No rotated files found, went back %d seconds' % GO_BACK 6414 if not found: 6415 # print additional diagnostics 6416 dn, fn = os.path.split(self.fn) 6417 files = [f for f in os.listdir(dn) if f.startswith(fn)] 6418 print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) 6419 print('The only matching files are: %s' % files, file=sys.stderr) 6420 for f in files: 6421 print('Contents of %s:' % f) 6422 path = os.path.join(dn, f) 6423 with open(path, 'r') as tf: 6424 print(tf.read()) 6425 self.assertTrue(found, msg=msg) 6426 6427 def test_rollover_at_midnight(self, weekly=False): 6428 os_helper.unlink(self.fn) 6429 now = datetime.datetime.now() 6430 atTime = now.time() 6431 if not 0.1 < atTime.microsecond/1e6 < 0.9: 6432 # The test requires all records to be emitted within 6433 # the range of the same whole second. 6434 time.sleep((0.1 - atTime.microsecond/1e6) % 1.0) 6435 now = datetime.datetime.now() 6436 atTime = now.time() 6437 atTime = atTime.replace(microsecond=0) 6438 fmt = logging.Formatter('%(asctime)s %(message)s') 6439 when = f'W{now.weekday()}' if weekly else 'MIDNIGHT' 6440 for i in range(3): 6441 fh = logging.handlers.TimedRotatingFileHandler( 6442 self.fn, encoding="utf-8", when=when, atTime=atTime) 6443 fh.setFormatter(fmt) 6444 r2 = logging.makeLogRecord({'msg': f'testing1 {i}'}) 6445 fh.emit(r2) 6446 fh.close() 6447 self.assertLogFile(self.fn) 6448 with open(self.fn, encoding="utf-8") as f: 6449 for i, line in enumerate(f): 6450 self.assertIn(f'testing1 {i}', line) 6451 6452 os.utime(self.fn, (now.timestamp() - 1,)*2) 6453 for i in range(2): 6454 fh = logging.handlers.TimedRotatingFileHandler( 6455 self.fn, encoding="utf-8", when=when, atTime=atTime) 6456 fh.setFormatter(fmt) 6457 r2 = logging.makeLogRecord({'msg': f'testing2 {i}'}) 6458 fh.emit(r2) 6459 fh.close() 6460 rolloverDate = now - datetime.timedelta(days=7 if weekly else 1) 6461 otherfn = f'{self.fn}.{rolloverDate:%Y-%m-%d}' 6462 self.assertLogFile(otherfn) 6463 with open(self.fn, encoding="utf-8") as f: 6464 for i, line in enumerate(f): 6465 self.assertIn(f'testing2 {i}', line) 6466 with open(otherfn, encoding="utf-8") as f: 6467 for i, line in enumerate(f): 6468 self.assertIn(f'testing1 {i}', line) 6469 6470 def test_rollover_at_weekday(self): 6471 self.test_rollover_at_midnight(weekly=True) 6472 6473 def test_invalid(self): 6474 assertRaises = self.assertRaises 6475 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 6476 self.fn, 'X', encoding="utf-8", delay=True) 6477 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 6478 self.fn, 'W', encoding="utf-8", delay=True) 6479 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 6480 self.fn, 'W7', encoding="utf-8", delay=True) 6481 6482 # TODO: Test for utc=False. 6483 def test_compute_rollover_daily_attime(self): 6484 currentTime = 0 6485 rh = logging.handlers.TimedRotatingFileHandler( 6486 self.fn, encoding="utf-8", when='MIDNIGHT', 6487 utc=True, atTime=None) 6488 try: 6489 actual = rh.computeRollover(currentTime) 6490 self.assertEqual(actual, currentTime + 24 * 60 * 60) 6491 6492 actual = rh.computeRollover(currentTime + 24 * 60 * 60 - 1) 6493 self.assertEqual(actual, currentTime + 24 * 60 * 60) 6494 6495 actual = rh.computeRollover(currentTime + 24 * 60 * 60) 6496 self.assertEqual(actual, currentTime + 48 * 60 * 60) 6497 6498 actual = rh.computeRollover(currentTime + 25 * 60 * 60) 6499 self.assertEqual(actual, currentTime + 48 * 60 * 60) 6500 finally: 6501 rh.close() 6502 6503 atTime = datetime.time(12, 0, 0) 6504 rh = logging.handlers.TimedRotatingFileHandler( 6505 self.fn, encoding="utf-8", when='MIDNIGHT', 6506 utc=True, atTime=atTime) 6507 try: 6508 actual = rh.computeRollover(currentTime) 6509 self.assertEqual(actual, currentTime + 12 * 60 * 60) 6510 6511 actual = rh.computeRollover(currentTime + 12 * 60 * 60 - 1) 6512 self.assertEqual(actual, currentTime + 12 * 60 * 60) 6513 6514 actual = rh.computeRollover(currentTime + 12 * 60 * 60) 6515 self.assertEqual(actual, currentTime + 36 * 60 * 60) 6516 6517 actual = rh.computeRollover(currentTime + 13 * 60 * 60) 6518 self.assertEqual(actual, currentTime + 36 * 60 * 60) 6519 finally: 6520 rh.close() 6521 6522 # TODO: Test for utc=False. 6523 def test_compute_rollover_weekly_attime(self): 6524 currentTime = int(time.time()) 6525 today = currentTime - currentTime % 86400 6526 6527 atTime = datetime.time(12, 0, 0) 6528 6529 wday = time.gmtime(today).tm_wday 6530 for day in range(7): 6531 rh = logging.handlers.TimedRotatingFileHandler( 6532 self.fn, encoding="utf-8", when='W%d' % day, interval=1, backupCount=0, 6533 utc=True, atTime=atTime) 6534 try: 6535 if wday > day: 6536 # The rollover day has already passed this week, so we 6537 # go over into next week 6538 expected = (7 - wday + day) 6539 else: 6540 expected = (day - wday) 6541 # At this point expected is in days from now, convert to seconds 6542 expected *= 24 * 60 * 60 6543 # Add in the rollover time 6544 expected += 12 * 60 * 60 6545 # Add in adjustment for today 6546 expected += today 6547 6548 actual = rh.computeRollover(today) 6549 if actual != expected: 6550 print('failed in timezone: %d' % time.timezone) 6551 print('local vars: %s' % locals()) 6552 self.assertEqual(actual, expected) 6553 6554 actual = rh.computeRollover(today + 12 * 60 * 60 - 1) 6555 if actual != expected: 6556 print('failed in timezone: %d' % time.timezone) 6557 print('local vars: %s' % locals()) 6558 self.assertEqual(actual, expected) 6559 6560 if day == wday: 6561 # goes into following week 6562 expected += 7 * 24 * 60 * 60 6563 actual = rh.computeRollover(today + 12 * 60 * 60) 6564 if actual != expected: 6565 print('failed in timezone: %d' % time.timezone) 6566 print('local vars: %s' % locals()) 6567 self.assertEqual(actual, expected) 6568 6569 actual = rh.computeRollover(today + 13 * 60 * 60) 6570 if actual != expected: 6571 print('failed in timezone: %d' % time.timezone) 6572 print('local vars: %s' % locals()) 6573 self.assertEqual(actual, expected) 6574 finally: 6575 rh.close() 6576 6577 def test_compute_files_to_delete(self): 6578 # See bpo-46063 for background 6579 wd = tempfile.mkdtemp(prefix='test_logging_') 6580 self.addCleanup(shutil.rmtree, wd) 6581 times = [] 6582 dt = datetime.datetime.now() 6583 for i in range(10): 6584 times.append(dt.strftime('%Y-%m-%d_%H-%M-%S')) 6585 dt += datetime.timedelta(seconds=5) 6586 prefixes = ('a.b', 'a.b.c', 'd.e', 'd.e.f', 'g') 6587 files = [] 6588 rotators = [] 6589 for prefix in prefixes: 6590 p = os.path.join(wd, '%s.log' % prefix) 6591 rotator = logging.handlers.TimedRotatingFileHandler(p, when='s', 6592 interval=5, 6593 backupCount=7, 6594 delay=True) 6595 rotators.append(rotator) 6596 if prefix.startswith('a.b'): 6597 for t in times: 6598 files.append('%s.log.%s' % (prefix, t)) 6599 elif prefix.startswith('d.e'): 6600 def namer(filename): 6601 dirname, basename = os.path.split(filename) 6602 basename = basename.replace('.log', '') + '.log' 6603 return os.path.join(dirname, basename) 6604 rotator.namer = namer 6605 for t in times: 6606 files.append('%s.%s.log' % (prefix, t)) 6607 elif prefix == 'g': 6608 def namer(filename): 6609 dirname, basename = os.path.split(filename) 6610 basename = 'g' + basename[6:] + '.oldlog' 6611 return os.path.join(dirname, basename) 6612 rotator.namer = namer 6613 for t in times: 6614 files.append('g%s.oldlog' % t) 6615 # Create empty files 6616 for fn in files: 6617 p = os.path.join(wd, fn) 6618 with open(p, 'wb') as f: 6619 pass 6620 # Now the checks that only the correct files are offered up for deletion 6621 for i, prefix in enumerate(prefixes): 6622 rotator = rotators[i] 6623 candidates = rotator.getFilesToDelete() 6624 self.assertEqual(len(candidates), 3, candidates) 6625 if prefix.startswith('a.b'): 6626 p = '%s.log.' % prefix 6627 for c in candidates: 6628 d, fn = os.path.split(c) 6629 self.assertTrue(fn.startswith(p)) 6630 elif prefix.startswith('d.e'): 6631 for c in candidates: 6632 d, fn = os.path.split(c) 6633 self.assertTrue(fn.endswith('.log'), fn) 6634 self.assertTrue(fn.startswith(prefix + '.') and 6635 fn[len(prefix) + 2].isdigit()) 6636 elif prefix == 'g': 6637 for c in candidates: 6638 d, fn = os.path.split(c) 6639 self.assertTrue(fn.endswith('.oldlog')) 6640 self.assertTrue(fn.startswith('g') and fn[1].isdigit()) 6641 6642 def test_compute_files_to_delete_same_filename_different_extensions(self): 6643 # See GH-93205 for background 6644 wd = pathlib.Path(tempfile.mkdtemp(prefix='test_logging_')) 6645 self.addCleanup(shutil.rmtree, wd) 6646 times = [] 6647 dt = datetime.datetime.now() 6648 n_files = 10 6649 for _ in range(n_files): 6650 times.append(dt.strftime('%Y-%m-%d_%H-%M-%S')) 6651 dt += datetime.timedelta(seconds=5) 6652 prefixes = ('a.log', 'a.log.b') 6653 files = [] 6654 rotators = [] 6655 for i, prefix in enumerate(prefixes): 6656 backupCount = i+1 6657 rotator = logging.handlers.TimedRotatingFileHandler(wd / prefix, when='s', 6658 interval=5, 6659 backupCount=backupCount, 6660 delay=True) 6661 rotators.append(rotator) 6662 for t in times: 6663 files.append('%s.%s' % (prefix, t)) 6664 for t in times: 6665 files.append('a.log.%s.c' % t) 6666 # Create empty files 6667 for f in files: 6668 (wd / f).touch() 6669 # Now the checks that only the correct files are offered up for deletion 6670 for i, prefix in enumerate(prefixes): 6671 backupCount = i+1 6672 rotator = rotators[i] 6673 candidates = rotator.getFilesToDelete() 6674 self.assertEqual(len(candidates), n_files - backupCount, candidates) 6675 matcher = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}\Z") 6676 for c in candidates: 6677 d, fn = os.path.split(c) 6678 self.assertTrue(fn.startswith(prefix+'.')) 6679 suffix = fn[(len(prefix)+1):] 6680 self.assertRegex(suffix, matcher) 6681 6682 # Run with US-style DST rules: DST begins 2 a.m. on second Sunday in 6683 # March (M3.2.0) and ends 2 a.m. on first Sunday in November (M11.1.0). 6684 @support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0') 6685 def test_compute_rollover_MIDNIGHT_local(self): 6686 # DST begins at 2012-3-11T02:00:00 and ends at 2012-11-4T02:00:00. 6687 DT = datetime.datetime 6688 def test(current, expected): 6689 actual = fh.computeRollover(current.timestamp()) 6690 diff = actual - expected.timestamp() 6691 if diff: 6692 self.assertEqual(diff, 0, datetime.timedelta(seconds=diff)) 6693 6694 fh = logging.handlers.TimedRotatingFileHandler( 6695 self.fn, encoding="utf-8", when='MIDNIGHT', utc=False) 6696 6697 test(DT(2012, 3, 10, 23, 59, 59), DT(2012, 3, 11, 0, 0)) 6698 test(DT(2012, 3, 11, 0, 0), DT(2012, 3, 12, 0, 0)) 6699 test(DT(2012, 3, 11, 1, 0), DT(2012, 3, 12, 0, 0)) 6700 6701 test(DT(2012, 11, 3, 23, 59, 59), DT(2012, 11, 4, 0, 0)) 6702 test(DT(2012, 11, 4, 0, 0), DT(2012, 11, 5, 0, 0)) 6703 test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 5, 0, 0)) 6704 6705 fh.close() 6706 6707 fh = logging.handlers.TimedRotatingFileHandler( 6708 self.fn, encoding="utf-8", when='MIDNIGHT', utc=False, 6709 atTime=datetime.time(12, 0, 0)) 6710 6711 test(DT(2012, 3, 10, 11, 59, 59), DT(2012, 3, 10, 12, 0)) 6712 test(DT(2012, 3, 10, 12, 0), DT(2012, 3, 11, 12, 0)) 6713 test(DT(2012, 3, 10, 13, 0), DT(2012, 3, 11, 12, 0)) 6714 6715 test(DT(2012, 11, 3, 11, 59, 59), DT(2012, 11, 3, 12, 0)) 6716 test(DT(2012, 11, 3, 12, 0), DT(2012, 11, 4, 12, 0)) 6717 test(DT(2012, 11, 3, 13, 0), DT(2012, 11, 4, 12, 0)) 6718 6719 fh.close() 6720 6721 fh = logging.handlers.TimedRotatingFileHandler( 6722 self.fn, encoding="utf-8", when='MIDNIGHT', utc=False, 6723 atTime=datetime.time(2, 0, 0)) 6724 6725 test(DT(2012, 3, 10, 1, 59, 59), DT(2012, 3, 10, 2, 0)) 6726 # 2:00:00 is the same as 3:00:00 at 2012-3-11. 6727 test(DT(2012, 3, 10, 2, 0), DT(2012, 3, 11, 3, 0)) 6728 test(DT(2012, 3, 10, 3, 0), DT(2012, 3, 11, 3, 0)) 6729 6730 test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 11, 3, 0)) 6731 # No time between 2:00:00 and 3:00:00 at 2012-3-11. 6732 test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 12, 2, 0)) 6733 test(DT(2012, 3, 11, 4, 0), DT(2012, 3, 12, 2, 0)) 6734 6735 test(DT(2012, 11, 3, 1, 59, 59), DT(2012, 11, 3, 2, 0)) 6736 test(DT(2012, 11, 3, 2, 0), DT(2012, 11, 4, 2, 0)) 6737 test(DT(2012, 11, 3, 3, 0), DT(2012, 11, 4, 2, 0)) 6738 6739 # 1:00:00-2:00:00 is repeated twice at 2012-11-4. 6740 test(DT(2012, 11, 4, 1, 59, 59), DT(2012, 11, 4, 2, 0)) 6741 test(DT(2012, 11, 4, 1, 59, 59, fold=1), DT(2012, 11, 4, 2, 0)) 6742 test(DT(2012, 11, 4, 2, 0), DT(2012, 11, 5, 2, 0)) 6743 test(DT(2012, 11, 4, 3, 0), DT(2012, 11, 5, 2, 0)) 6744 6745 fh.close() 6746 6747 fh = logging.handlers.TimedRotatingFileHandler( 6748 self.fn, encoding="utf-8", when='MIDNIGHT', utc=False, 6749 atTime=datetime.time(2, 30, 0)) 6750 6751 test(DT(2012, 3, 10, 2, 29, 59), DT(2012, 3, 10, 2, 30)) 6752 # No time 2:30:00 at 2012-3-11. 6753 test(DT(2012, 3, 10, 2, 30), DT(2012, 3, 11, 3, 30)) 6754 test(DT(2012, 3, 10, 3, 0), DT(2012, 3, 11, 3, 30)) 6755 6756 test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 11, 3, 30)) 6757 # No time between 2:00:00 and 3:00:00 at 2012-3-11. 6758 test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 12, 2, 30)) 6759 test(DT(2012, 3, 11, 3, 30), DT(2012, 3, 12, 2, 30)) 6760 6761 test(DT(2012, 11, 3, 2, 29, 59), DT(2012, 11, 3, 2, 30)) 6762 test(DT(2012, 11, 3, 2, 30), DT(2012, 11, 4, 2, 30)) 6763 test(DT(2012, 11, 3, 3, 0), DT(2012, 11, 4, 2, 30)) 6764 6765 fh.close() 6766 6767 fh = logging.handlers.TimedRotatingFileHandler( 6768 self.fn, encoding="utf-8", when='MIDNIGHT', utc=False, 6769 atTime=datetime.time(1, 30, 0)) 6770 6771 test(DT(2012, 3, 11, 1, 29, 59), DT(2012, 3, 11, 1, 30)) 6772 test(DT(2012, 3, 11, 1, 30), DT(2012, 3, 12, 1, 30)) 6773 test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 12, 1, 30)) 6774 # No time between 2:00:00 and 3:00:00 at 2012-3-11. 6775 test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 12, 1, 30)) 6776 test(DT(2012, 3, 11, 3, 30), DT(2012, 3, 12, 1, 30)) 6777 6778 # 1:00:00-2:00:00 is repeated twice at 2012-11-4. 6779 test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 4, 1, 30)) 6780 test(DT(2012, 11, 4, 1, 29, 59), DT(2012, 11, 4, 1, 30)) 6781 test(DT(2012, 11, 4, 1, 30), DT(2012, 11, 5, 1, 30)) 6782 test(DT(2012, 11, 4, 1, 59, 59), DT(2012, 11, 5, 1, 30)) 6783 # It is weird, but the rollover date jumps back from 2012-11-5 6784 # to 2012-11-4. 6785 test(DT(2012, 11, 4, 1, 0, fold=1), DT(2012, 11, 4, 1, 30, fold=1)) 6786 test(DT(2012, 11, 4, 1, 29, 59, fold=1), DT(2012, 11, 4, 1, 30, fold=1)) 6787 test(DT(2012, 11, 4, 1, 30, fold=1), DT(2012, 11, 5, 1, 30)) 6788 test(DT(2012, 11, 4, 1, 59, 59, fold=1), DT(2012, 11, 5, 1, 30)) 6789 test(DT(2012, 11, 4, 2, 0), DT(2012, 11, 5, 1, 30)) 6790 test(DT(2012, 11, 4, 2, 30), DT(2012, 11, 5, 1, 30)) 6791 6792 fh.close() 6793 6794 # Run with US-style DST rules: DST begins 2 a.m. on second Sunday in 6795 # March (M3.2.0) and ends 2 a.m. on first Sunday in November (M11.1.0). 6796 @support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0') 6797 def test_compute_rollover_W6_local(self): 6798 # DST begins at 2012-3-11T02:00:00 and ends at 2012-11-4T02:00:00. 6799 DT = datetime.datetime 6800 def test(current, expected): 6801 actual = fh.computeRollover(current.timestamp()) 6802 diff = actual - expected.timestamp() 6803 if diff: 6804 self.assertEqual(diff, 0, datetime.timedelta(seconds=diff)) 6805 6806 fh = logging.handlers.TimedRotatingFileHandler( 6807 self.fn, encoding="utf-8", when='W6', utc=False) 6808 6809 test(DT(2012, 3, 4, 23, 59, 59), DT(2012, 3, 5, 0, 0)) 6810 test(DT(2012, 3, 5, 0, 0), DT(2012, 3, 12, 0, 0)) 6811 test(DT(2012, 3, 5, 1, 0), DT(2012, 3, 12, 0, 0)) 6812 6813 test(DT(2012, 10, 28, 23, 59, 59), DT(2012, 10, 29, 0, 0)) 6814 test(DT(2012, 10, 29, 0, 0), DT(2012, 11, 5, 0, 0)) 6815 test(DT(2012, 10, 29, 1, 0), DT(2012, 11, 5, 0, 0)) 6816 6817 fh.close() 6818 6819 fh = logging.handlers.TimedRotatingFileHandler( 6820 self.fn, encoding="utf-8", when='W6', utc=False, 6821 atTime=datetime.time(0, 0, 0)) 6822 6823 test(DT(2012, 3, 10, 23, 59, 59), DT(2012, 3, 11, 0, 0)) 6824 test(DT(2012, 3, 11, 0, 0), DT(2012, 3, 18, 0, 0)) 6825 test(DT(2012, 3, 11, 1, 0), DT(2012, 3, 18, 0, 0)) 6826 6827 test(DT(2012, 11, 3, 23, 59, 59), DT(2012, 11, 4, 0, 0)) 6828 test(DT(2012, 11, 4, 0, 0), DT(2012, 11, 11, 0, 0)) 6829 test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 11, 0, 0)) 6830 6831 fh.close() 6832 6833 fh = logging.handlers.TimedRotatingFileHandler( 6834 self.fn, encoding="utf-8", when='W6', utc=False, 6835 atTime=datetime.time(12, 0, 0)) 6836 6837 test(DT(2012, 3, 4, 11, 59, 59), DT(2012, 3, 4, 12, 0)) 6838 test(DT(2012, 3, 4, 12, 0), DT(2012, 3, 11, 12, 0)) 6839 test(DT(2012, 3, 4, 13, 0), DT(2012, 3, 11, 12, 0)) 6840 6841 test(DT(2012, 10, 28, 11, 59, 59), DT(2012, 10, 28, 12, 0)) 6842 test(DT(2012, 10, 28, 12, 0), DT(2012, 11, 4, 12, 0)) 6843 test(DT(2012, 10, 28, 13, 0), DT(2012, 11, 4, 12, 0)) 6844 6845 fh.close() 6846 6847 fh = logging.handlers.TimedRotatingFileHandler( 6848 self.fn, encoding="utf-8", when='W6', utc=False, 6849 atTime=datetime.time(2, 0, 0)) 6850 6851 test(DT(2012, 3, 4, 1, 59, 59), DT(2012, 3, 4, 2, 0)) 6852 # 2:00:00 is the same as 3:00:00 at 2012-3-11. 6853 test(DT(2012, 3, 4, 2, 0), DT(2012, 3, 11, 3, 0)) 6854 test(DT(2012, 3, 4, 3, 0), DT(2012, 3, 11, 3, 0)) 6855 6856 test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 11, 3, 0)) 6857 # No time between 2:00:00 and 3:00:00 at 2012-3-11. 6858 test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 18, 2, 0)) 6859 test(DT(2012, 3, 11, 4, 0), DT(2012, 3, 18, 2, 0)) 6860 6861 test(DT(2012, 10, 28, 1, 59, 59), DT(2012, 10, 28, 2, 0)) 6862 test(DT(2012, 10, 28, 2, 0), DT(2012, 11, 4, 2, 0)) 6863 test(DT(2012, 10, 28, 3, 0), DT(2012, 11, 4, 2, 0)) 6864 6865 # 1:00:00-2:00:00 is repeated twice at 2012-11-4. 6866 test(DT(2012, 11, 4, 1, 59, 59), DT(2012, 11, 4, 2, 0)) 6867 test(DT(2012, 11, 4, 1, 59, 59, fold=1), DT(2012, 11, 4, 2, 0)) 6868 test(DT(2012, 11, 4, 2, 0), DT(2012, 11, 11, 2, 0)) 6869 test(DT(2012, 11, 4, 3, 0), DT(2012, 11, 11, 2, 0)) 6870 6871 fh.close() 6872 6873 fh = logging.handlers.TimedRotatingFileHandler( 6874 self.fn, encoding="utf-8", when='W6', utc=False, 6875 atTime=datetime.time(2, 30, 0)) 6876 6877 test(DT(2012, 3, 4, 2, 29, 59), DT(2012, 3, 4, 2, 30)) 6878 # No time 2:30:00 at 2012-3-11. 6879 test(DT(2012, 3, 4, 2, 30), DT(2012, 3, 11, 3, 30)) 6880 test(DT(2012, 3, 4, 3, 0), DT(2012, 3, 11, 3, 30)) 6881 6882 test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 11, 3, 30)) 6883 # No time between 2:00:00 and 3:00:00 at 2012-3-11. 6884 test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 18, 2, 30)) 6885 test(DT(2012, 3, 11, 3, 30), DT(2012, 3, 18, 2, 30)) 6886 6887 test(DT(2012, 10, 28, 2, 29, 59), DT(2012, 10, 28, 2, 30)) 6888 test(DT(2012, 10, 28, 2, 30), DT(2012, 11, 4, 2, 30)) 6889 test(DT(2012, 10, 28, 3, 0), DT(2012, 11, 4, 2, 30)) 6890 6891 fh.close() 6892 6893 fh = logging.handlers.TimedRotatingFileHandler( 6894 self.fn, encoding="utf-8", when='W6', utc=False, 6895 atTime=datetime.time(1, 30, 0)) 6896 6897 test(DT(2012, 3, 11, 1, 29, 59), DT(2012, 3, 11, 1, 30)) 6898 test(DT(2012, 3, 11, 1, 30), DT(2012, 3, 18, 1, 30)) 6899 test(DT(2012, 3, 11, 1, 59, 59), DT(2012, 3, 18, 1, 30)) 6900 # No time between 2:00:00 and 3:00:00 at 2012-3-11. 6901 test(DT(2012, 3, 11, 3, 0), DT(2012, 3, 18, 1, 30)) 6902 test(DT(2012, 3, 11, 3, 30), DT(2012, 3, 18, 1, 30)) 6903 6904 # 1:00:00-2:00:00 is repeated twice at 2012-11-4. 6905 test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 4, 1, 30)) 6906 test(DT(2012, 11, 4, 1, 29, 59), DT(2012, 11, 4, 1, 30)) 6907 test(DT(2012, 11, 4, 1, 30), DT(2012, 11, 11, 1, 30)) 6908 test(DT(2012, 11, 4, 1, 59, 59), DT(2012, 11, 11, 1, 30)) 6909 # It is weird, but the rollover date jumps back from 2012-11-11 6910 # to 2012-11-4. 6911 test(DT(2012, 11, 4, 1, 0, fold=1), DT(2012, 11, 4, 1, 30, fold=1)) 6912 test(DT(2012, 11, 4, 1, 29, 59, fold=1), DT(2012, 11, 4, 1, 30, fold=1)) 6913 test(DT(2012, 11, 4, 1, 30, fold=1), DT(2012, 11, 11, 1, 30)) 6914 test(DT(2012, 11, 4, 1, 59, 59, fold=1), DT(2012, 11, 11, 1, 30)) 6915 test(DT(2012, 11, 4, 2, 0), DT(2012, 11, 11, 1, 30)) 6916 test(DT(2012, 11, 4, 2, 30), DT(2012, 11, 11, 1, 30)) 6917 6918 fh.close() 6919 6920 # Run with US-style DST rules: DST begins 2 a.m. on second Sunday in 6921 # March (M3.2.0) and ends 2 a.m. on first Sunday in November (M11.1.0). 6922 @support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0') 6923 def test_compute_rollover_MIDNIGHT_local_interval(self): 6924 # DST begins at 2012-3-11T02:00:00 and ends at 2012-11-4T02:00:00. 6925 DT = datetime.datetime 6926 def test(current, expected): 6927 actual = fh.computeRollover(current.timestamp()) 6928 diff = actual - expected.timestamp() 6929 if diff: 6930 self.assertEqual(diff, 0, datetime.timedelta(seconds=diff)) 6931 6932 fh = logging.handlers.TimedRotatingFileHandler( 6933 self.fn, encoding="utf-8", when='MIDNIGHT', utc=False, interval=3) 6934 6935 test(DT(2012, 3, 8, 23, 59, 59), DT(2012, 3, 11, 0, 0)) 6936 test(DT(2012, 3, 9, 0, 0), DT(2012, 3, 12, 0, 0)) 6937 test(DT(2012, 3, 9, 1, 0), DT(2012, 3, 12, 0, 0)) 6938 test(DT(2012, 3, 10, 23, 59, 59), DT(2012, 3, 13, 0, 0)) 6939 test(DT(2012, 3, 11, 0, 0), DT(2012, 3, 14, 0, 0)) 6940 test(DT(2012, 3, 11, 1, 0), DT(2012, 3, 14, 0, 0)) 6941 6942 test(DT(2012, 11, 1, 23, 59, 59), DT(2012, 11, 4, 0, 0)) 6943 test(DT(2012, 11, 2, 0, 0), DT(2012, 11, 5, 0, 0)) 6944 test(DT(2012, 11, 2, 1, 0), DT(2012, 11, 5, 0, 0)) 6945 test(DT(2012, 11, 3, 23, 59, 59), DT(2012, 11, 6, 0, 0)) 6946 test(DT(2012, 11, 4, 0, 0), DT(2012, 11, 7, 0, 0)) 6947 test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 7, 0, 0)) 6948 6949 fh.close() 6950 6951 fh = logging.handlers.TimedRotatingFileHandler( 6952 self.fn, encoding="utf-8", when='MIDNIGHT', utc=False, interval=3, 6953 atTime=datetime.time(12, 0, 0)) 6954 6955 test(DT(2012, 3, 8, 11, 59, 59), DT(2012, 3, 10, 12, 0)) 6956 test(DT(2012, 3, 8, 12, 0), DT(2012, 3, 11, 12, 0)) 6957 test(DT(2012, 3, 8, 13, 0), DT(2012, 3, 11, 12, 0)) 6958 test(DT(2012, 3, 10, 11, 59, 59), DT(2012, 3, 12, 12, 0)) 6959 test(DT(2012, 3, 10, 12, 0), DT(2012, 3, 13, 12, 0)) 6960 test(DT(2012, 3, 10, 13, 0), DT(2012, 3, 13, 12, 0)) 6961 6962 test(DT(2012, 11, 1, 11, 59, 59), DT(2012, 11, 3, 12, 0)) 6963 test(DT(2012, 11, 1, 12, 0), DT(2012, 11, 4, 12, 0)) 6964 test(DT(2012, 11, 1, 13, 0), DT(2012, 11, 4, 12, 0)) 6965 test(DT(2012, 11, 3, 11, 59, 59), DT(2012, 11, 5, 12, 0)) 6966 test(DT(2012, 11, 3, 12, 0), DT(2012, 11, 6, 12, 0)) 6967 test(DT(2012, 11, 3, 13, 0), DT(2012, 11, 6, 12, 0)) 6968 6969 fh.close() 6970 6971 # Run with US-style DST rules: DST begins 2 a.m. on second Sunday in 6972 # March (M3.2.0) and ends 2 a.m. on first Sunday in November (M11.1.0). 6973 @support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0') 6974 def test_compute_rollover_W6_local_interval(self): 6975 # DST begins at 2012-3-11T02:00:00 and ends at 2012-11-4T02:00:00. 6976 DT = datetime.datetime 6977 def test(current, expected): 6978 actual = fh.computeRollover(current.timestamp()) 6979 diff = actual - expected.timestamp() 6980 if diff: 6981 self.assertEqual(diff, 0, datetime.timedelta(seconds=diff)) 6982 6983 fh = logging.handlers.TimedRotatingFileHandler( 6984 self.fn, encoding="utf-8", when='W6', utc=False, interval=3) 6985 6986 test(DT(2012, 2, 19, 23, 59, 59), DT(2012, 3, 5, 0, 0)) 6987 test(DT(2012, 2, 20, 0, 0), DT(2012, 3, 12, 0, 0)) 6988 test(DT(2012, 2, 20, 1, 0), DT(2012, 3, 12, 0, 0)) 6989 test(DT(2012, 3, 4, 23, 59, 59), DT(2012, 3, 19, 0, 0)) 6990 test(DT(2012, 3, 5, 0, 0), DT(2012, 3, 26, 0, 0)) 6991 test(DT(2012, 3, 5, 1, 0), DT(2012, 3, 26, 0, 0)) 6992 6993 test(DT(2012, 10, 14, 23, 59, 59), DT(2012, 10, 29, 0, 0)) 6994 test(DT(2012, 10, 15, 0, 0), DT(2012, 11, 5, 0, 0)) 6995 test(DT(2012, 10, 15, 1, 0), DT(2012, 11, 5, 0, 0)) 6996 test(DT(2012, 10, 28, 23, 59, 59), DT(2012, 11, 12, 0, 0)) 6997 test(DT(2012, 10, 29, 0, 0), DT(2012, 11, 19, 0, 0)) 6998 test(DT(2012, 10, 29, 1, 0), DT(2012, 11, 19, 0, 0)) 6999 7000 fh.close() 7001 7002 fh = logging.handlers.TimedRotatingFileHandler( 7003 self.fn, encoding="utf-8", when='W6', utc=False, interval=3, 7004 atTime=datetime.time(0, 0, 0)) 7005 7006 test(DT(2012, 2, 25, 23, 59, 59), DT(2012, 3, 11, 0, 0)) 7007 test(DT(2012, 2, 26, 0, 0), DT(2012, 3, 18, 0, 0)) 7008 test(DT(2012, 2, 26, 1, 0), DT(2012, 3, 18, 0, 0)) 7009 test(DT(2012, 3, 10, 23, 59, 59), DT(2012, 3, 25, 0, 0)) 7010 test(DT(2012, 3, 11, 0, 0), DT(2012, 4, 1, 0, 0)) 7011 test(DT(2012, 3, 11, 1, 0), DT(2012, 4, 1, 0, 0)) 7012 7013 test(DT(2012, 10, 20, 23, 59, 59), DT(2012, 11, 4, 0, 0)) 7014 test(DT(2012, 10, 21, 0, 0), DT(2012, 11, 11, 0, 0)) 7015 test(DT(2012, 10, 21, 1, 0), DT(2012, 11, 11, 0, 0)) 7016 test(DT(2012, 11, 3, 23, 59, 59), DT(2012, 11, 18, 0, 0)) 7017 test(DT(2012, 11, 4, 0, 0), DT(2012, 11, 25, 0, 0)) 7018 test(DT(2012, 11, 4, 1, 0), DT(2012, 11, 25, 0, 0)) 7019 7020 fh.close() 7021 7022 fh = logging.handlers.TimedRotatingFileHandler( 7023 self.fn, encoding="utf-8", when='W6', utc=False, interval=3, 7024 atTime=datetime.time(12, 0, 0)) 7025 7026 test(DT(2012, 2, 18, 11, 59, 59), DT(2012, 3, 4, 12, 0)) 7027 test(DT(2012, 2, 19, 12, 0), DT(2012, 3, 11, 12, 0)) 7028 test(DT(2012, 2, 19, 13, 0), DT(2012, 3, 11, 12, 0)) 7029 test(DT(2012, 3, 4, 11, 59, 59), DT(2012, 3, 18, 12, 0)) 7030 test(DT(2012, 3, 4, 12, 0), DT(2012, 3, 25, 12, 0)) 7031 test(DT(2012, 3, 4, 13, 0), DT(2012, 3, 25, 12, 0)) 7032 7033 test(DT(2012, 10, 14, 11, 59, 59), DT(2012, 10, 28, 12, 0)) 7034 test(DT(2012, 10, 14, 12, 0), DT(2012, 11, 4, 12, 0)) 7035 test(DT(2012, 10, 14, 13, 0), DT(2012, 11, 4, 12, 0)) 7036 test(DT(2012, 10, 28, 11, 59, 59), DT(2012, 11, 11, 12, 0)) 7037 test(DT(2012, 10, 28, 12, 0), DT(2012, 11, 18, 12, 0)) 7038 test(DT(2012, 10, 28, 13, 0), DT(2012, 11, 18, 12, 0)) 7039 7040 fh.close() 7041 7042 7043def secs(**kw): 7044 return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) 7045 7046for when, exp in (('S', 1), 7047 ('M', 60), 7048 ('H', 60 * 60), 7049 ('D', 60 * 60 * 24), 7050 ('MIDNIGHT', 60 * 60 * 24), 7051 # current time (epoch start) is a Thursday, W0 means Monday 7052 ('W0', secs(days=4, hours=24)), 7053 ): 7054 for interval in 1, 3: 7055 def test_compute_rollover(self, when=when, interval=interval, exp=exp): 7056 rh = logging.handlers.TimedRotatingFileHandler( 7057 self.fn, encoding="utf-8", when=when, interval=interval, backupCount=0, utc=True) 7058 currentTime = 0.0 7059 actual = rh.computeRollover(currentTime) 7060 if when.startswith('W'): 7061 exp += secs(days=7*(interval-1)) 7062 else: 7063 exp *= interval 7064 if exp != actual: 7065 # Failures occur on some systems for MIDNIGHT and W0. 7066 # Print detailed calculation for MIDNIGHT so we can try to see 7067 # what's going on 7068 if when == 'MIDNIGHT': 7069 try: 7070 if rh.utc: 7071 t = time.gmtime(currentTime) 7072 else: 7073 t = time.localtime(currentTime) 7074 currentHour = t[3] 7075 currentMinute = t[4] 7076 currentSecond = t[5] 7077 # r is the number of seconds left between now and midnight 7078 r = logging.handlers._MIDNIGHT - ((currentHour * 60 + 7079 currentMinute) * 60 + 7080 currentSecond) 7081 result = currentTime + r 7082 print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) 7083 print('currentHour: %s' % currentHour, file=sys.stderr) 7084 print('currentMinute: %s' % currentMinute, file=sys.stderr) 7085 print('currentSecond: %s' % currentSecond, file=sys.stderr) 7086 print('r: %s' % r, file=sys.stderr) 7087 print('result: %s' % result, file=sys.stderr) 7088 except Exception as e: 7089 print('exception in diagnostic code: %s' % e, file=sys.stderr) 7090 self.assertEqual(exp, actual) 7091 rh.close() 7092 name = "test_compute_rollover_%s" % when 7093 if interval > 1: 7094 name += "_interval" 7095 test_compute_rollover.__name__ = name 7096 setattr(TimedRotatingFileHandlerTest, name, test_compute_rollover) 7097 7098 7099@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.') 7100class NTEventLogHandlerTest(BaseTest): 7101 def test_basic(self): 7102 logtype = 'Application' 7103 elh = win32evtlog.OpenEventLog(None, logtype) 7104 num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) 7105 7106 try: 7107 h = logging.handlers.NTEventLogHandler('test_logging') 7108 except pywintypes.error as e: 7109 if e.winerror == 5: # access denied 7110 raise unittest.SkipTest('Insufficient privileges to run test') 7111 raise 7112 7113 r = logging.makeLogRecord({'msg': 'Test Log Message'}) 7114 h.handle(r) 7115 h.close() 7116 # Now see if the event is recorded 7117 self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) 7118 flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ 7119 win32evtlog.EVENTLOG_SEQUENTIAL_READ 7120 found = False 7121 GO_BACK = 100 7122 events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) 7123 for e in events: 7124 if e.SourceName != 'test_logging': 7125 continue 7126 msg = win32evtlogutil.SafeFormatMessage(e, logtype) 7127 if msg != 'Test Log Message\r\n': 7128 continue 7129 found = True 7130 break 7131 msg = 'Record not found in event log, went back %d records' % GO_BACK 7132 self.assertTrue(found, msg=msg) 7133 7134 7135class MiscTestCase(unittest.TestCase): 7136 def test__all__(self): 7137 not_exported = { 7138 'logThreads', 'logMultiprocessing', 'logProcesses', 'currentframe', 7139 'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle', 7140 'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 'root', 7141 'threading', 'logAsyncioTasks'} 7142 support.check__all__(self, logging, not_exported=not_exported) 7143 7144 7145# Set the locale to the platform-dependent default. I have no idea 7146# why the test does this, but in any case we save the current locale 7147# first and restore it at the end. 7148def setUpModule(): 7149 unittest.enterModuleContext(support.run_with_locale('LC_ALL', '')) 7150 7151 7152if __name__ == "__main__": 7153 unittest.main() 7154