1# Copyright 2001-2017 by Vinay Sajip. All Rights Reserved. 2# 3# Permission to use, copy, modify, and distribute this software and its 4# documentation for any purpose and without fee is hereby granted, 5# provided that the above copyright notice appear in all copies and that 6# both that copyright notice and this permission notice appear in 7# supporting documentation, and that the name of Vinay Sajip 8# not be used in advertising or publicity pertaining to distribution 9# of the software without specific, written prior permission. 10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING 11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR 13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER 14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 17"""Test harness for the logging module. Run all tests. 18 19Copyright (C) 2001-2017 Vinay Sajip. All Rights Reserved. 20""" 21 22import logging 23import logging.handlers 24import logging.config 25 26import codecs 27import configparser 28import copy 29import datetime 30import pathlib 31import pickle 32import io 33import gc 34import json 35import os 36import queue 37import random 38import re 39import signal 40import socket 41import struct 42import sys 43import tempfile 44from test.support.script_helper import assert_python_ok, assert_python_failure 45from test import support 46import textwrap 47import threading 48import time 49import unittest 50import warnings 51import weakref 52 53import asyncore 54from http.server import HTTPServer, BaseHTTPRequestHandler 55import smtpd 56from urllib.parse import urlparse, parse_qs 57from socketserver import (ThreadingUDPServer, DatagramRequestHandler, 58 ThreadingTCPServer, StreamRequestHandler) 59 60try: 61 import win32evtlog, win32evtlogutil, pywintypes 62except ImportError: 63 win32evtlog = win32evtlogutil = pywintypes = None 64 65try: 66 import zlib 67except ImportError: 68 pass 69 70class BaseTest(unittest.TestCase): 71 72 """Base class for logging tests.""" 73 74 log_format = "%(name)s -> %(levelname)s: %(message)s" 75 expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" 76 message_num = 0 77 78 def setUp(self): 79 """Setup the default logging stream to an internal StringIO instance, 80 so that we can examine log output as we want.""" 81 self._threading_key = support.threading_setup() 82 83 logger_dict = logging.getLogger().manager.loggerDict 84 logging._acquireLock() 85 try: 86 self.saved_handlers = logging._handlers.copy() 87 self.saved_handler_list = logging._handlerList[:] 88 self.saved_loggers = saved_loggers = logger_dict.copy() 89 self.saved_name_to_level = logging._nameToLevel.copy() 90 self.saved_level_to_name = logging._levelToName.copy() 91 self.logger_states = logger_states = {} 92 for name in saved_loggers: 93 logger_states[name] = getattr(saved_loggers[name], 94 'disabled', None) 95 finally: 96 logging._releaseLock() 97 98 # Set two unused loggers 99 self.logger1 = logging.getLogger("\xab\xd7\xbb") 100 self.logger2 = logging.getLogger("\u013f\u00d6\u0047") 101 102 self.root_logger = logging.getLogger("") 103 self.original_logging_level = self.root_logger.getEffectiveLevel() 104 105 self.stream = io.StringIO() 106 self.root_logger.setLevel(logging.DEBUG) 107 self.root_hdlr = logging.StreamHandler(self.stream) 108 self.root_formatter = logging.Formatter(self.log_format) 109 self.root_hdlr.setFormatter(self.root_formatter) 110 if self.logger1.hasHandlers(): 111 hlist = self.logger1.handlers + self.root_logger.handlers 112 raise AssertionError('Unexpected handlers: %s' % hlist) 113 if self.logger2.hasHandlers(): 114 hlist = self.logger2.handlers + self.root_logger.handlers 115 raise AssertionError('Unexpected handlers: %s' % hlist) 116 self.root_logger.addHandler(self.root_hdlr) 117 self.assertTrue(self.logger1.hasHandlers()) 118 self.assertTrue(self.logger2.hasHandlers()) 119 120 def tearDown(self): 121 """Remove our logging stream, and restore the original logging 122 level.""" 123 self.stream.close() 124 self.root_logger.removeHandler(self.root_hdlr) 125 while self.root_logger.handlers: 126 h = self.root_logger.handlers[0] 127 self.root_logger.removeHandler(h) 128 h.close() 129 self.root_logger.setLevel(self.original_logging_level) 130 logging._acquireLock() 131 try: 132 logging._levelToName.clear() 133 logging._levelToName.update(self.saved_level_to_name) 134 logging._nameToLevel.clear() 135 logging._nameToLevel.update(self.saved_name_to_level) 136 logging._handlers.clear() 137 logging._handlers.update(self.saved_handlers) 138 logging._handlerList[:] = self.saved_handler_list 139 manager = logging.getLogger().manager 140 manager.disable = 0 141 loggerDict = manager.loggerDict 142 loggerDict.clear() 143 loggerDict.update(self.saved_loggers) 144 logger_states = self.logger_states 145 for name in self.logger_states: 146 if logger_states[name] is not None: 147 self.saved_loggers[name].disabled = logger_states[name] 148 finally: 149 logging._releaseLock() 150 151 self.doCleanups() 152 support.threading_cleanup(*self._threading_key) 153 154 def assert_log_lines(self, expected_values, stream=None, pat=None): 155 """Match the collected log lines against the regular expression 156 self.expected_log_pat, and compare the extracted group values to 157 the expected_values list of tuples.""" 158 stream = stream or self.stream 159 pat = re.compile(pat or self.expected_log_pat) 160 actual_lines = stream.getvalue().splitlines() 161 self.assertEqual(len(actual_lines), len(expected_values)) 162 for actual, expected in zip(actual_lines, expected_values): 163 match = pat.search(actual) 164 if not match: 165 self.fail("Log line does not match expected pattern:\n" + 166 actual) 167 self.assertEqual(tuple(match.groups()), expected) 168 s = stream.read() 169 if s: 170 self.fail("Remaining output at end of log stream:\n" + s) 171 172 def next_message(self): 173 """Generate a message consisting solely of an auto-incrementing 174 integer.""" 175 self.message_num += 1 176 return "%d" % self.message_num 177 178 179class BuiltinLevelsTest(BaseTest): 180 """Test builtin levels and their inheritance.""" 181 182 def test_flat(self): 183 # Logging levels in a flat logger namespace. 184 m = self.next_message 185 186 ERR = logging.getLogger("ERR") 187 ERR.setLevel(logging.ERROR) 188 INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) 189 INF.setLevel(logging.INFO) 190 DEB = logging.getLogger("DEB") 191 DEB.setLevel(logging.DEBUG) 192 193 # These should log. 194 ERR.log(logging.CRITICAL, m()) 195 ERR.error(m()) 196 197 INF.log(logging.CRITICAL, m()) 198 INF.error(m()) 199 INF.warning(m()) 200 INF.info(m()) 201 202 DEB.log(logging.CRITICAL, m()) 203 DEB.error(m()) 204 DEB.warning(m()) 205 DEB.info(m()) 206 DEB.debug(m()) 207 208 # These should not log. 209 ERR.warning(m()) 210 ERR.info(m()) 211 ERR.debug(m()) 212 213 INF.debug(m()) 214 215 self.assert_log_lines([ 216 ('ERR', 'CRITICAL', '1'), 217 ('ERR', 'ERROR', '2'), 218 ('INF', 'CRITICAL', '3'), 219 ('INF', 'ERROR', '4'), 220 ('INF', 'WARNING', '5'), 221 ('INF', 'INFO', '6'), 222 ('DEB', 'CRITICAL', '7'), 223 ('DEB', 'ERROR', '8'), 224 ('DEB', 'WARNING', '9'), 225 ('DEB', 'INFO', '10'), 226 ('DEB', 'DEBUG', '11'), 227 ]) 228 229 def test_nested_explicit(self): 230 # Logging levels in a nested namespace, all explicitly set. 231 m = self.next_message 232 233 INF = logging.getLogger("INF") 234 INF.setLevel(logging.INFO) 235 INF_ERR = logging.getLogger("INF.ERR") 236 INF_ERR.setLevel(logging.ERROR) 237 238 # These should log. 239 INF_ERR.log(logging.CRITICAL, m()) 240 INF_ERR.error(m()) 241 242 # These should not log. 243 INF_ERR.warning(m()) 244 INF_ERR.info(m()) 245 INF_ERR.debug(m()) 246 247 self.assert_log_lines([ 248 ('INF.ERR', 'CRITICAL', '1'), 249 ('INF.ERR', 'ERROR', '2'), 250 ]) 251 252 def test_nested_inherited(self): 253 # Logging levels in a nested namespace, inherited from parent loggers. 254 m = self.next_message 255 256 INF = logging.getLogger("INF") 257 INF.setLevel(logging.INFO) 258 INF_ERR = logging.getLogger("INF.ERR") 259 INF_ERR.setLevel(logging.ERROR) 260 INF_UNDEF = logging.getLogger("INF.UNDEF") 261 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") 262 UNDEF = logging.getLogger("UNDEF") 263 264 # These should log. 265 INF_UNDEF.log(logging.CRITICAL, m()) 266 INF_UNDEF.error(m()) 267 INF_UNDEF.warning(m()) 268 INF_UNDEF.info(m()) 269 INF_ERR_UNDEF.log(logging.CRITICAL, m()) 270 INF_ERR_UNDEF.error(m()) 271 272 # These should not log. 273 INF_UNDEF.debug(m()) 274 INF_ERR_UNDEF.warning(m()) 275 INF_ERR_UNDEF.info(m()) 276 INF_ERR_UNDEF.debug(m()) 277 278 self.assert_log_lines([ 279 ('INF.UNDEF', 'CRITICAL', '1'), 280 ('INF.UNDEF', 'ERROR', '2'), 281 ('INF.UNDEF', 'WARNING', '3'), 282 ('INF.UNDEF', 'INFO', '4'), 283 ('INF.ERR.UNDEF', 'CRITICAL', '5'), 284 ('INF.ERR.UNDEF', 'ERROR', '6'), 285 ]) 286 287 def test_nested_with_virtual_parent(self): 288 # Logging levels when some parent does not exist yet. 289 m = self.next_message 290 291 INF = logging.getLogger("INF") 292 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") 293 CHILD = logging.getLogger("INF.BADPARENT") 294 INF.setLevel(logging.INFO) 295 296 # These should log. 297 GRANDCHILD.log(logging.FATAL, m()) 298 GRANDCHILD.info(m()) 299 CHILD.log(logging.FATAL, m()) 300 CHILD.info(m()) 301 302 # These should not log. 303 GRANDCHILD.debug(m()) 304 CHILD.debug(m()) 305 306 self.assert_log_lines([ 307 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), 308 ('INF.BADPARENT.UNDEF', 'INFO', '2'), 309 ('INF.BADPARENT', 'CRITICAL', '3'), 310 ('INF.BADPARENT', 'INFO', '4'), 311 ]) 312 313 def test_regression_22386(self): 314 """See issue #22386 for more information.""" 315 self.assertEqual(logging.getLevelName('INFO'), logging.INFO) 316 self.assertEqual(logging.getLevelName(logging.INFO), 'INFO') 317 318 def test_issue27935(self): 319 fatal = logging.getLevelName('FATAL') 320 self.assertEqual(fatal, logging.FATAL) 321 322 def test_regression_29220(self): 323 """See issue #29220 for more information.""" 324 logging.addLevelName(logging.INFO, '') 325 self.addCleanup(logging.addLevelName, logging.INFO, 'INFO') 326 self.assertEqual(logging.getLevelName(logging.INFO), '') 327 self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET') 328 self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET) 329 330class BasicFilterTest(BaseTest): 331 332 """Test the bundled Filter class.""" 333 334 def test_filter(self): 335 # Only messages satisfying the specified criteria pass through the 336 # filter. 337 filter_ = logging.Filter("spam.eggs") 338 handler = self.root_logger.handlers[0] 339 try: 340 handler.addFilter(filter_) 341 spam = logging.getLogger("spam") 342 spam_eggs = logging.getLogger("spam.eggs") 343 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 344 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 345 346 spam.info(self.next_message()) 347 spam_eggs.info(self.next_message()) # Good. 348 spam_eggs_fish.info(self.next_message()) # Good. 349 spam_bakedbeans.info(self.next_message()) 350 351 self.assert_log_lines([ 352 ('spam.eggs', 'INFO', '2'), 353 ('spam.eggs.fish', 'INFO', '3'), 354 ]) 355 finally: 356 handler.removeFilter(filter_) 357 358 def test_callable_filter(self): 359 # Only messages satisfying the specified criteria pass through the 360 # filter. 361 362 def filterfunc(record): 363 parts = record.name.split('.') 364 prefix = '.'.join(parts[:2]) 365 return prefix == 'spam.eggs' 366 367 handler = self.root_logger.handlers[0] 368 try: 369 handler.addFilter(filterfunc) 370 spam = logging.getLogger("spam") 371 spam_eggs = logging.getLogger("spam.eggs") 372 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 373 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 374 375 spam.info(self.next_message()) 376 spam_eggs.info(self.next_message()) # Good. 377 spam_eggs_fish.info(self.next_message()) # Good. 378 spam_bakedbeans.info(self.next_message()) 379 380 self.assert_log_lines([ 381 ('spam.eggs', 'INFO', '2'), 382 ('spam.eggs.fish', 'INFO', '3'), 383 ]) 384 finally: 385 handler.removeFilter(filterfunc) 386 387 def test_empty_filter(self): 388 f = logging.Filter() 389 r = logging.makeLogRecord({'name': 'spam.eggs'}) 390 self.assertTrue(f.filter(r)) 391 392# 393# First, we define our levels. There can be as many as you want - the only 394# limitations are that they should be integers, the lowest should be > 0 and 395# larger values mean less information being logged. If you need specific 396# level values which do not fit into these limitations, you can use a 397# mapping dictionary to convert between your application levels and the 398# logging system. 399# 400SILENT = 120 401TACITURN = 119 402TERSE = 118 403EFFUSIVE = 117 404SOCIABLE = 116 405VERBOSE = 115 406TALKATIVE = 114 407GARRULOUS = 113 408CHATTERBOX = 112 409BORING = 111 410 411LEVEL_RANGE = range(BORING, SILENT + 1) 412 413# 414# Next, we define names for our levels. You don't need to do this - in which 415# case the system will use "Level n" to denote the text for the level. 416# 417my_logging_levels = { 418 SILENT : 'Silent', 419 TACITURN : 'Taciturn', 420 TERSE : 'Terse', 421 EFFUSIVE : 'Effusive', 422 SOCIABLE : 'Sociable', 423 VERBOSE : 'Verbose', 424 TALKATIVE : 'Talkative', 425 GARRULOUS : 'Garrulous', 426 CHATTERBOX : 'Chatterbox', 427 BORING : 'Boring', 428} 429 430class GarrulousFilter(logging.Filter): 431 432 """A filter which blocks garrulous messages.""" 433 434 def filter(self, record): 435 return record.levelno != GARRULOUS 436 437class VerySpecificFilter(logging.Filter): 438 439 """A filter which blocks sociable and taciturn messages.""" 440 441 def filter(self, record): 442 return record.levelno not in [SOCIABLE, TACITURN] 443 444 445class CustomLevelsAndFiltersTest(BaseTest): 446 447 """Test various filtering possibilities with custom logging levels.""" 448 449 # Skip the logger name group. 450 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 451 452 def setUp(self): 453 BaseTest.setUp(self) 454 for k, v in my_logging_levels.items(): 455 logging.addLevelName(k, v) 456 457 def log_at_all_levels(self, logger): 458 for lvl in LEVEL_RANGE: 459 logger.log(lvl, self.next_message()) 460 461 def test_logger_filter(self): 462 # Filter at logger level. 463 self.root_logger.setLevel(VERBOSE) 464 # Levels >= 'Verbose' are good. 465 self.log_at_all_levels(self.root_logger) 466 self.assert_log_lines([ 467 ('Verbose', '5'), 468 ('Sociable', '6'), 469 ('Effusive', '7'), 470 ('Terse', '8'), 471 ('Taciturn', '9'), 472 ('Silent', '10'), 473 ]) 474 475 def test_handler_filter(self): 476 # Filter at handler level. 477 self.root_logger.handlers[0].setLevel(SOCIABLE) 478 try: 479 # Levels >= 'Sociable' are good. 480 self.log_at_all_levels(self.root_logger) 481 self.assert_log_lines([ 482 ('Sociable', '6'), 483 ('Effusive', '7'), 484 ('Terse', '8'), 485 ('Taciturn', '9'), 486 ('Silent', '10'), 487 ]) 488 finally: 489 self.root_logger.handlers[0].setLevel(logging.NOTSET) 490 491 def test_specific_filters(self): 492 # Set a specific filter object on the handler, and then add another 493 # filter object on the logger itself. 494 handler = self.root_logger.handlers[0] 495 specific_filter = None 496 garr = GarrulousFilter() 497 handler.addFilter(garr) 498 try: 499 self.log_at_all_levels(self.root_logger) 500 first_lines = [ 501 # Notice how 'Garrulous' is missing 502 ('Boring', '1'), 503 ('Chatterbox', '2'), 504 ('Talkative', '4'), 505 ('Verbose', '5'), 506 ('Sociable', '6'), 507 ('Effusive', '7'), 508 ('Terse', '8'), 509 ('Taciturn', '9'), 510 ('Silent', '10'), 511 ] 512 self.assert_log_lines(first_lines) 513 514 specific_filter = VerySpecificFilter() 515 self.root_logger.addFilter(specific_filter) 516 self.log_at_all_levels(self.root_logger) 517 self.assert_log_lines(first_lines + [ 518 # Not only 'Garrulous' is still missing, but also 'Sociable' 519 # and 'Taciturn' 520 ('Boring', '11'), 521 ('Chatterbox', '12'), 522 ('Talkative', '14'), 523 ('Verbose', '15'), 524 ('Effusive', '17'), 525 ('Terse', '18'), 526 ('Silent', '20'), 527 ]) 528 finally: 529 if specific_filter: 530 self.root_logger.removeFilter(specific_filter) 531 handler.removeFilter(garr) 532 533 534class HandlerTest(BaseTest): 535 def test_name(self): 536 h = logging.Handler() 537 h.name = 'generic' 538 self.assertEqual(h.name, 'generic') 539 h.name = 'anothergeneric' 540 self.assertEqual(h.name, 'anothergeneric') 541 self.assertRaises(NotImplementedError, h.emit, None) 542 543 def test_builtin_handlers(self): 544 # We can't actually *use* too many handlers in the tests, 545 # but we can try instantiating them with various options 546 if sys.platform in ('linux', 'darwin'): 547 for existing in (True, False): 548 fd, fn = tempfile.mkstemp() 549 os.close(fd) 550 if not existing: 551 os.unlink(fn) 552 h = logging.handlers.WatchedFileHandler(fn, delay=True) 553 if existing: 554 dev, ino = h.dev, h.ino 555 self.assertEqual(dev, -1) 556 self.assertEqual(ino, -1) 557 r = logging.makeLogRecord({'msg': 'Test'}) 558 h.handle(r) 559 # Now remove the file. 560 os.unlink(fn) 561 self.assertFalse(os.path.exists(fn)) 562 # The next call should recreate the file. 563 h.handle(r) 564 self.assertTrue(os.path.exists(fn)) 565 else: 566 self.assertEqual(h.dev, -1) 567 self.assertEqual(h.ino, -1) 568 h.close() 569 if existing: 570 os.unlink(fn) 571 if sys.platform == 'darwin': 572 sockname = '/var/run/syslog' 573 else: 574 sockname = '/dev/log' 575 try: 576 h = logging.handlers.SysLogHandler(sockname) 577 self.assertEqual(h.facility, h.LOG_USER) 578 self.assertTrue(h.unixsocket) 579 h.close() 580 except OSError: # syslogd might not be available 581 pass 582 for method in ('GET', 'POST', 'PUT'): 583 if method == 'PUT': 584 self.assertRaises(ValueError, logging.handlers.HTTPHandler, 585 'localhost', '/log', method) 586 else: 587 h = logging.handlers.HTTPHandler('localhost', '/log', method) 588 h.close() 589 h = logging.handlers.BufferingHandler(0) 590 r = logging.makeLogRecord({}) 591 self.assertTrue(h.shouldFlush(r)) 592 h.close() 593 h = logging.handlers.BufferingHandler(1) 594 self.assertFalse(h.shouldFlush(r)) 595 h.close() 596 597 def test_path_objects(self): 598 """ 599 Test that Path objects are accepted as filename arguments to handlers. 600 601 See Issue #27493. 602 """ 603 fd, fn = tempfile.mkstemp() 604 os.close(fd) 605 os.unlink(fn) 606 pfn = pathlib.Path(fn) 607 cases = ( 608 (logging.FileHandler, (pfn, 'w')), 609 (logging.handlers.RotatingFileHandler, (pfn, 'a')), 610 (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')), 611 ) 612 if sys.platform in ('linux', 'darwin'): 613 cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),) 614 for cls, args in cases: 615 h = cls(*args) 616 self.assertTrue(os.path.exists(fn)) 617 h.close() 618 os.unlink(fn) 619 620 @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') 621 def test_race(self): 622 # Issue #14632 refers. 623 def remove_loop(fname, tries): 624 for _ in range(tries): 625 try: 626 os.unlink(fname) 627 self.deletion_time = time.time() 628 except OSError: 629 pass 630 time.sleep(0.004 * random.randint(0, 4)) 631 632 del_count = 500 633 log_count = 500 634 635 self.handle_time = None 636 self.deletion_time = None 637 638 for delay in (False, True): 639 fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') 640 os.close(fd) 641 remover = threading.Thread(target=remove_loop, args=(fn, del_count)) 642 remover.daemon = True 643 remover.start() 644 h = logging.handlers.WatchedFileHandler(fn, delay=delay) 645 f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') 646 h.setFormatter(f) 647 try: 648 for _ in range(log_count): 649 time.sleep(0.005) 650 r = logging.makeLogRecord({'msg': 'testing' }) 651 try: 652 self.handle_time = time.time() 653 h.handle(r) 654 except Exception: 655 print('Deleted at %s, ' 656 'opened at %s' % (self.deletion_time, 657 self.handle_time)) 658 raise 659 finally: 660 remover.join() 661 h.close() 662 if os.path.exists(fn): 663 os.unlink(fn) 664 665 # The implementation relies on os.register_at_fork existing, but we test 666 # based on os.fork existing because that is what users and this test use. 667 # This helps ensure that when fork exists (the important concept) that the 668 # register_at_fork mechanism is also present and used. 669 @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().') 670 def test_post_fork_child_no_deadlock(self): 671 """Ensure child logging locks are not held; bpo-6721 & bpo-36533.""" 672 class _OurHandler(logging.Handler): 673 def __init__(self): 674 super().__init__() 675 self.sub_handler = logging.StreamHandler( 676 stream=open('/dev/null', 'wt')) 677 678 def emit(self, record): 679 self.sub_handler.acquire() 680 try: 681 self.sub_handler.emit(record) 682 finally: 683 self.sub_handler.release() 684 685 self.assertEqual(len(logging._handlers), 0) 686 refed_h = _OurHandler() 687 self.addCleanup(refed_h.sub_handler.stream.close) 688 refed_h.name = 'because we need at least one for this test' 689 self.assertGreater(len(logging._handlers), 0) 690 self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1) 691 test_logger = logging.getLogger('test_post_fork_child_no_deadlock') 692 test_logger.addHandler(refed_h) 693 test_logger.setLevel(logging.DEBUG) 694 695 locks_held__ready_to_fork = threading.Event() 696 fork_happened__release_locks_and_end_thread = threading.Event() 697 698 def lock_holder_thread_fn(): 699 logging._acquireLock() 700 try: 701 refed_h.acquire() 702 try: 703 # Tell the main thread to do the fork. 704 locks_held__ready_to_fork.set() 705 706 # If the deadlock bug exists, the fork will happen 707 # without dealing with the locks we hold, deadlocking 708 # the child. 709 710 # Wait for a successful fork or an unreasonable amount of 711 # time before releasing our locks. To avoid a timing based 712 # test we'd need communication from os.fork() as to when it 713 # has actually happened. Given this is a regression test 714 # for a fixed issue, potentially less reliably detecting 715 # regression via timing is acceptable for simplicity. 716 # The test will always take at least this long. :( 717 fork_happened__release_locks_and_end_thread.wait(0.5) 718 finally: 719 refed_h.release() 720 finally: 721 logging._releaseLock() 722 723 lock_holder_thread = threading.Thread( 724 target=lock_holder_thread_fn, 725 name='test_post_fork_child_no_deadlock lock holder') 726 lock_holder_thread.start() 727 728 locks_held__ready_to_fork.wait() 729 pid = os.fork() 730 if pid == 0: # Child. 731 try: 732 test_logger.info(r'Child process did not deadlock. \o/') 733 finally: 734 os._exit(0) 735 else: # Parent. 736 test_logger.info(r'Parent process returned from fork. \o/') 737 fork_happened__release_locks_and_end_thread.set() 738 lock_holder_thread.join() 739 start_time = time.monotonic() 740 while True: 741 test_logger.debug('Waiting for child process.') 742 waited_pid, status = os.waitpid(pid, os.WNOHANG) 743 if waited_pid == pid: 744 break # child process exited. 745 if time.monotonic() - start_time > 7: 746 break # so long? implies child deadlock. 747 time.sleep(0.05) 748 test_logger.debug('Done waiting.') 749 if waited_pid != pid: 750 os.kill(pid, signal.SIGKILL) 751 waited_pid, status = os.waitpid(pid, 0) 752 self.fail("child process deadlocked.") 753 self.assertEqual(status, 0, msg="child process error") 754 755 756class BadStream(object): 757 def write(self, data): 758 raise RuntimeError('deliberate mistake') 759 760class TestStreamHandler(logging.StreamHandler): 761 def handleError(self, record): 762 self.error_record = record 763 764class StreamWithIntName(object): 765 level = logging.NOTSET 766 name = 2 767 768class StreamHandlerTest(BaseTest): 769 def test_error_handling(self): 770 h = TestStreamHandler(BadStream()) 771 r = logging.makeLogRecord({}) 772 old_raise = logging.raiseExceptions 773 774 try: 775 h.handle(r) 776 self.assertIs(h.error_record, r) 777 778 h = logging.StreamHandler(BadStream()) 779 with support.captured_stderr() as stderr: 780 h.handle(r) 781 msg = '\nRuntimeError: deliberate mistake\n' 782 self.assertIn(msg, stderr.getvalue()) 783 784 logging.raiseExceptions = False 785 with support.captured_stderr() as stderr: 786 h.handle(r) 787 self.assertEqual('', stderr.getvalue()) 788 finally: 789 logging.raiseExceptions = old_raise 790 791 def test_stream_setting(self): 792 """ 793 Test setting the handler's stream 794 """ 795 h = logging.StreamHandler() 796 stream = io.StringIO() 797 old = h.setStream(stream) 798 self.assertIs(old, sys.stderr) 799 actual = h.setStream(old) 800 self.assertIs(actual, stream) 801 # test that setting to existing value returns None 802 actual = h.setStream(old) 803 self.assertIsNone(actual) 804 805 def test_can_represent_stream_with_int_name(self): 806 h = logging.StreamHandler(StreamWithIntName()) 807 self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>') 808 809# -- The following section could be moved into a server_helper.py module 810# -- if it proves to be of wider utility than just test_logging 811 812class TestSMTPServer(smtpd.SMTPServer): 813 """ 814 This class implements a test SMTP server. 815 816 :param addr: A (host, port) tuple which the server listens on. 817 You can specify a port value of zero: the server's 818 *port* attribute will hold the actual port number 819 used, which can be used in client connections. 820 :param handler: A callable which will be called to process 821 incoming messages. The handler will be passed 822 the client address tuple, who the message is from, 823 a list of recipients and the message data. 824 :param poll_interval: The interval, in seconds, used in the underlying 825 :func:`select` or :func:`poll` call by 826 :func:`asyncore.loop`. 827 :param sockmap: A dictionary which will be used to hold 828 :class:`asyncore.dispatcher` instances used by 829 :func:`asyncore.loop`. This avoids changing the 830 :mod:`asyncore` module's global state. 831 """ 832 833 def __init__(self, addr, handler, poll_interval, sockmap): 834 smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, 835 decode_data=True) 836 self.port = self.socket.getsockname()[1] 837 self._handler = handler 838 self._thread = None 839 self.poll_interval = poll_interval 840 841 def process_message(self, peer, mailfrom, rcpttos, data): 842 """ 843 Delegates to the handler passed in to the server's constructor. 844 845 Typically, this will be a test case method. 846 :param peer: The client (host, port) tuple. 847 :param mailfrom: The address of the sender. 848 :param rcpttos: The addresses of the recipients. 849 :param data: The message. 850 """ 851 self._handler(peer, mailfrom, rcpttos, data) 852 853 def start(self): 854 """ 855 Start the server running on a separate daemon thread. 856 """ 857 self._thread = t = threading.Thread(target=self.serve_forever, 858 args=(self.poll_interval,)) 859 t.setDaemon(True) 860 t.start() 861 862 def serve_forever(self, poll_interval): 863 """ 864 Run the :mod:`asyncore` loop until normal termination 865 conditions arise. 866 :param poll_interval: The interval, in seconds, used in the underlying 867 :func:`select` or :func:`poll` call by 868 :func:`asyncore.loop`. 869 """ 870 asyncore.loop(poll_interval, map=self._map) 871 872 def stop(self, timeout=None): 873 """ 874 Stop the thread by closing the server instance. 875 Wait for the server thread to terminate. 876 877 :param timeout: How long to wait for the server thread 878 to terminate. 879 """ 880 self.close() 881 support.join_thread(self._thread, timeout) 882 self._thread = None 883 asyncore.close_all(map=self._map, ignore_all=True) 884 885 886class ControlMixin(object): 887 """ 888 This mixin is used to start a server on a separate thread, and 889 shut it down programmatically. Request handling is simplified - instead 890 of needing to derive a suitable RequestHandler subclass, you just 891 provide a callable which will be passed each received request to be 892 processed. 893 894 :param handler: A handler callable which will be called with a 895 single parameter - the request - in order to 896 process the request. This handler is called on the 897 server thread, effectively meaning that requests are 898 processed serially. While not quite Web scale ;-), 899 this should be fine for testing applications. 900 :param poll_interval: The polling interval in seconds. 901 """ 902 def __init__(self, handler, poll_interval): 903 self._thread = None 904 self.poll_interval = poll_interval 905 self._handler = handler 906 self.ready = threading.Event() 907 908 def start(self): 909 """ 910 Create a daemon thread to run the server, and start it. 911 """ 912 self._thread = t = threading.Thread(target=self.serve_forever, 913 args=(self.poll_interval,)) 914 t.setDaemon(True) 915 t.start() 916 917 def serve_forever(self, poll_interval): 918 """ 919 Run the server. Set the ready flag before entering the 920 service loop. 921 """ 922 self.ready.set() 923 super(ControlMixin, self).serve_forever(poll_interval) 924 925 def stop(self, timeout=None): 926 """ 927 Tell the server thread to stop, and wait for it to do so. 928 929 :param timeout: How long to wait for the server thread 930 to terminate. 931 """ 932 self.shutdown() 933 if self._thread is not None: 934 support.join_thread(self._thread, timeout) 935 self._thread = None 936 self.server_close() 937 self.ready.clear() 938 939class TestHTTPServer(ControlMixin, HTTPServer): 940 """ 941 An HTTP server which is controllable using :class:`ControlMixin`. 942 943 :param addr: A tuple with the IP address and port to listen on. 944 :param handler: A handler callable which will be called with a 945 single parameter - the request - in order to 946 process the request. 947 :param poll_interval: The polling interval in seconds. 948 :param log: Pass ``True`` to enable log messages. 949 """ 950 def __init__(self, addr, handler, poll_interval=0.5, 951 log=False, sslctx=None): 952 class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): 953 def __getattr__(self, name, default=None): 954 if name.startswith('do_'): 955 return self.process_request 956 raise AttributeError(name) 957 958 def process_request(self): 959 self.server._handler(self) 960 961 def log_message(self, format, *args): 962 if log: 963 super(DelegatingHTTPRequestHandler, 964 self).log_message(format, *args) 965 HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) 966 ControlMixin.__init__(self, handler, poll_interval) 967 self.sslctx = sslctx 968 969 def get_request(self): 970 try: 971 sock, addr = self.socket.accept() 972 if self.sslctx: 973 sock = self.sslctx.wrap_socket(sock, server_side=True) 974 except OSError as e: 975 # socket errors are silenced by the caller, print them here 976 sys.stderr.write("Got an error:\n%s\n" % e) 977 raise 978 return sock, addr 979 980class TestTCPServer(ControlMixin, ThreadingTCPServer): 981 """ 982 A TCP server which is controllable using :class:`ControlMixin`. 983 984 :param addr: A tuple with the IP address and port to listen on. 985 :param handler: A handler callable which will be called with a single 986 parameter - the request - in order to process the request. 987 :param poll_interval: The polling interval in seconds. 988 :bind_and_activate: If True (the default), binds the server and starts it 989 listening. If False, you need to call 990 :meth:`server_bind` and :meth:`server_activate` at 991 some later time before calling :meth:`start`, so that 992 the server will set up the socket and listen on it. 993 """ 994 995 allow_reuse_address = True 996 997 def __init__(self, addr, handler, poll_interval=0.5, 998 bind_and_activate=True): 999 class DelegatingTCPRequestHandler(StreamRequestHandler): 1000 1001 def handle(self): 1002 self.server._handler(self) 1003 ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, 1004 bind_and_activate) 1005 ControlMixin.__init__(self, handler, poll_interval) 1006 1007 def server_bind(self): 1008 super(TestTCPServer, self).server_bind() 1009 self.port = self.socket.getsockname()[1] 1010 1011class TestUDPServer(ControlMixin, ThreadingUDPServer): 1012 """ 1013 A UDP server which is controllable using :class:`ControlMixin`. 1014 1015 :param addr: A tuple with the IP address and port to listen on. 1016 :param handler: A handler callable which will be called with a 1017 single parameter - the request - in order to 1018 process the request. 1019 :param poll_interval: The polling interval for shutdown requests, 1020 in seconds. 1021 :bind_and_activate: If True (the default), binds the server and 1022 starts it listening. If False, you need to 1023 call :meth:`server_bind` and 1024 :meth:`server_activate` at some later time 1025 before calling :meth:`start`, so that the server will 1026 set up the socket and listen on it. 1027 """ 1028 def __init__(self, addr, handler, poll_interval=0.5, 1029 bind_and_activate=True): 1030 class DelegatingUDPRequestHandler(DatagramRequestHandler): 1031 1032 def handle(self): 1033 self.server._handler(self) 1034 1035 def finish(self): 1036 data = self.wfile.getvalue() 1037 if data: 1038 try: 1039 super(DelegatingUDPRequestHandler, self).finish() 1040 except OSError: 1041 if not self.server._closed: 1042 raise 1043 1044 ThreadingUDPServer.__init__(self, addr, 1045 DelegatingUDPRequestHandler, 1046 bind_and_activate) 1047 ControlMixin.__init__(self, handler, poll_interval) 1048 self._closed = False 1049 1050 def server_bind(self): 1051 super(TestUDPServer, self).server_bind() 1052 self.port = self.socket.getsockname()[1] 1053 1054 def server_close(self): 1055 super(TestUDPServer, self).server_close() 1056 self._closed = True 1057 1058if hasattr(socket, "AF_UNIX"): 1059 class TestUnixStreamServer(TestTCPServer): 1060 address_family = socket.AF_UNIX 1061 1062 class TestUnixDatagramServer(TestUDPServer): 1063 address_family = socket.AF_UNIX 1064 1065# - end of server_helper section 1066 1067class SMTPHandlerTest(BaseTest): 1068 # bpo-14314, bpo-19665, bpo-34092: don't wait forever, timeout of 1 minute 1069 TIMEOUT = 60.0 1070 1071 def test_basic(self): 1072 sockmap = {} 1073 server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001, 1074 sockmap) 1075 server.start() 1076 addr = (support.HOST, server.port) 1077 h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', 1078 timeout=self.TIMEOUT) 1079 self.assertEqual(h.toaddrs, ['you']) 1080 self.messages = [] 1081 r = logging.makeLogRecord({'msg': 'Hello \u2713'}) 1082 self.handled = threading.Event() 1083 h.handle(r) 1084 self.handled.wait(self.TIMEOUT) 1085 server.stop() 1086 self.assertTrue(self.handled.is_set()) 1087 self.assertEqual(len(self.messages), 1) 1088 peer, mailfrom, rcpttos, data = self.messages[0] 1089 self.assertEqual(mailfrom, 'me') 1090 self.assertEqual(rcpttos, ['you']) 1091 self.assertIn('\nSubject: Log\n', data) 1092 self.assertTrue(data.endswith('\n\nHello \u2713')) 1093 h.close() 1094 1095 def process_message(self, *args): 1096 self.messages.append(args) 1097 self.handled.set() 1098 1099class MemoryHandlerTest(BaseTest): 1100 1101 """Tests for the MemoryHandler.""" 1102 1103 # Do not bother with a logger name group. 1104 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 1105 1106 def setUp(self): 1107 BaseTest.setUp(self) 1108 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1109 self.root_hdlr) 1110 self.mem_logger = logging.getLogger('mem') 1111 self.mem_logger.propagate = 0 1112 self.mem_logger.addHandler(self.mem_hdlr) 1113 1114 def tearDown(self): 1115 self.mem_hdlr.close() 1116 BaseTest.tearDown(self) 1117 1118 def test_flush(self): 1119 # The memory handler flushes to its target handler based on specific 1120 # criteria (message count and message level). 1121 self.mem_logger.debug(self.next_message()) 1122 self.assert_log_lines([]) 1123 self.mem_logger.info(self.next_message()) 1124 self.assert_log_lines([]) 1125 # This will flush because the level is >= logging.WARNING 1126 self.mem_logger.warning(self.next_message()) 1127 lines = [ 1128 ('DEBUG', '1'), 1129 ('INFO', '2'), 1130 ('WARNING', '3'), 1131 ] 1132 self.assert_log_lines(lines) 1133 for n in (4, 14): 1134 for i in range(9): 1135 self.mem_logger.debug(self.next_message()) 1136 self.assert_log_lines(lines) 1137 # This will flush because it's the 10th message since the last 1138 # flush. 1139 self.mem_logger.debug(self.next_message()) 1140 lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] 1141 self.assert_log_lines(lines) 1142 1143 self.mem_logger.debug(self.next_message()) 1144 self.assert_log_lines(lines) 1145 1146 def test_flush_on_close(self): 1147 """ 1148 Test that the flush-on-close configuration works as expected. 1149 """ 1150 self.mem_logger.debug(self.next_message()) 1151 self.assert_log_lines([]) 1152 self.mem_logger.info(self.next_message()) 1153 self.assert_log_lines([]) 1154 self.mem_logger.removeHandler(self.mem_hdlr) 1155 # Default behaviour is to flush on close. Check that it happens. 1156 self.mem_hdlr.close() 1157 lines = [ 1158 ('DEBUG', '1'), 1159 ('INFO', '2'), 1160 ] 1161 self.assert_log_lines(lines) 1162 # Now configure for flushing not to be done on close. 1163 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1164 self.root_hdlr, 1165 False) 1166 self.mem_logger.addHandler(self.mem_hdlr) 1167 self.mem_logger.debug(self.next_message()) 1168 self.assert_log_lines(lines) # no change 1169 self.mem_logger.info(self.next_message()) 1170 self.assert_log_lines(lines) # no change 1171 self.mem_logger.removeHandler(self.mem_hdlr) 1172 self.mem_hdlr.close() 1173 # assert that no new lines have been added 1174 self.assert_log_lines(lines) # no change 1175 1176 1177class ExceptionFormatter(logging.Formatter): 1178 """A special exception formatter.""" 1179 def formatException(self, ei): 1180 return "Got a [%s]" % ei[0].__name__ 1181 1182 1183class ConfigFileTest(BaseTest): 1184 1185 """Reading logging config from a .ini-style config file.""" 1186 1187 check_no_resource_warning = support.check_no_resource_warning 1188 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 1189 1190 # config0 is a standard configuration. 1191 config0 = """ 1192 [loggers] 1193 keys=root 1194 1195 [handlers] 1196 keys=hand1 1197 1198 [formatters] 1199 keys=form1 1200 1201 [logger_root] 1202 level=WARNING 1203 handlers=hand1 1204 1205 [handler_hand1] 1206 class=StreamHandler 1207 level=NOTSET 1208 formatter=form1 1209 args=(sys.stdout,) 1210 1211 [formatter_form1] 1212 format=%(levelname)s ++ %(message)s 1213 datefmt= 1214 """ 1215 1216 # config1 adds a little to the standard configuration. 1217 config1 = """ 1218 [loggers] 1219 keys=root,parser 1220 1221 [handlers] 1222 keys=hand1 1223 1224 [formatters] 1225 keys=form1 1226 1227 [logger_root] 1228 level=WARNING 1229 handlers= 1230 1231 [logger_parser] 1232 level=DEBUG 1233 handlers=hand1 1234 propagate=1 1235 qualname=compiler.parser 1236 1237 [handler_hand1] 1238 class=StreamHandler 1239 level=NOTSET 1240 formatter=form1 1241 args=(sys.stdout,) 1242 1243 [formatter_form1] 1244 format=%(levelname)s ++ %(message)s 1245 datefmt= 1246 """ 1247 1248 # config1a moves the handler to the root. 1249 config1a = """ 1250 [loggers] 1251 keys=root,parser 1252 1253 [handlers] 1254 keys=hand1 1255 1256 [formatters] 1257 keys=form1 1258 1259 [logger_root] 1260 level=WARNING 1261 handlers=hand1 1262 1263 [logger_parser] 1264 level=DEBUG 1265 handlers= 1266 propagate=1 1267 qualname=compiler.parser 1268 1269 [handler_hand1] 1270 class=StreamHandler 1271 level=NOTSET 1272 formatter=form1 1273 args=(sys.stdout,) 1274 1275 [formatter_form1] 1276 format=%(levelname)s ++ %(message)s 1277 datefmt= 1278 """ 1279 1280 # config2 has a subtle configuration error that should be reported 1281 config2 = config1.replace("sys.stdout", "sys.stbout") 1282 1283 # config3 has a less subtle configuration error 1284 config3 = config1.replace("formatter=form1", "formatter=misspelled_name") 1285 1286 # config4 specifies a custom formatter class to be loaded 1287 config4 = """ 1288 [loggers] 1289 keys=root 1290 1291 [handlers] 1292 keys=hand1 1293 1294 [formatters] 1295 keys=form1 1296 1297 [logger_root] 1298 level=NOTSET 1299 handlers=hand1 1300 1301 [handler_hand1] 1302 class=StreamHandler 1303 level=NOTSET 1304 formatter=form1 1305 args=(sys.stdout,) 1306 1307 [formatter_form1] 1308 class=""" + __name__ + """.ExceptionFormatter 1309 format=%(levelname)s:%(name)s:%(message)s 1310 datefmt= 1311 """ 1312 1313 # config5 specifies a custom handler class to be loaded 1314 config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') 1315 1316 # config6 uses ', ' delimiters in the handlers and formatters sections 1317 config6 = """ 1318 [loggers] 1319 keys=root,parser 1320 1321 [handlers] 1322 keys=hand1, hand2 1323 1324 [formatters] 1325 keys=form1, form2 1326 1327 [logger_root] 1328 level=WARNING 1329 handlers= 1330 1331 [logger_parser] 1332 level=DEBUG 1333 handlers=hand1 1334 propagate=1 1335 qualname=compiler.parser 1336 1337 [handler_hand1] 1338 class=StreamHandler 1339 level=NOTSET 1340 formatter=form1 1341 args=(sys.stdout,) 1342 1343 [handler_hand2] 1344 class=StreamHandler 1345 level=NOTSET 1346 formatter=form1 1347 args=(sys.stderr,) 1348 1349 [formatter_form1] 1350 format=%(levelname)s ++ %(message)s 1351 datefmt= 1352 1353 [formatter_form2] 1354 format=%(message)s 1355 datefmt= 1356 """ 1357 1358 # config7 adds a compiler logger, and uses kwargs instead of args. 1359 config7 = """ 1360 [loggers] 1361 keys=root,parser,compiler 1362 1363 [handlers] 1364 keys=hand1 1365 1366 [formatters] 1367 keys=form1 1368 1369 [logger_root] 1370 level=WARNING 1371 handlers=hand1 1372 1373 [logger_compiler] 1374 level=DEBUG 1375 handlers= 1376 propagate=1 1377 qualname=compiler 1378 1379 [logger_parser] 1380 level=DEBUG 1381 handlers= 1382 propagate=1 1383 qualname=compiler.parser 1384 1385 [handler_hand1] 1386 class=StreamHandler 1387 level=NOTSET 1388 formatter=form1 1389 kwargs={'stream': sys.stdout,} 1390 1391 [formatter_form1] 1392 format=%(levelname)s ++ %(message)s 1393 datefmt= 1394 """ 1395 1396 # config 8, check for resource warning 1397 config8 = r""" 1398 [loggers] 1399 keys=root 1400 1401 [handlers] 1402 keys=file 1403 1404 [formatters] 1405 keys= 1406 1407 [logger_root] 1408 level=DEBUG 1409 handlers=file 1410 1411 [handler_file] 1412 class=FileHandler 1413 level=DEBUG 1414 args=("{tempfile}",) 1415 """ 1416 1417 disable_test = """ 1418 [loggers] 1419 keys=root 1420 1421 [handlers] 1422 keys=screen 1423 1424 [formatters] 1425 keys= 1426 1427 [logger_root] 1428 level=DEBUG 1429 handlers=screen 1430 1431 [handler_screen] 1432 level=DEBUG 1433 class=StreamHandler 1434 args=(sys.stdout,) 1435 formatter= 1436 """ 1437 1438 def apply_config(self, conf, **kwargs): 1439 file = io.StringIO(textwrap.dedent(conf)) 1440 logging.config.fileConfig(file, **kwargs) 1441 1442 def test_config0_ok(self): 1443 # A simple config file which overrides the default settings. 1444 with support.captured_stdout() as output: 1445 self.apply_config(self.config0) 1446 logger = logging.getLogger() 1447 # Won't output anything 1448 logger.info(self.next_message()) 1449 # Outputs a message 1450 logger.error(self.next_message()) 1451 self.assert_log_lines([ 1452 ('ERROR', '2'), 1453 ], stream=output) 1454 # Original logger output is empty. 1455 self.assert_log_lines([]) 1456 1457 def test_config0_using_cp_ok(self): 1458 # A simple config file which overrides the default settings. 1459 with support.captured_stdout() as output: 1460 file = io.StringIO(textwrap.dedent(self.config0)) 1461 cp = configparser.ConfigParser() 1462 cp.read_file(file) 1463 logging.config.fileConfig(cp) 1464 logger = logging.getLogger() 1465 # Won't output anything 1466 logger.info(self.next_message()) 1467 # Outputs a message 1468 logger.error(self.next_message()) 1469 self.assert_log_lines([ 1470 ('ERROR', '2'), 1471 ], stream=output) 1472 # Original logger output is empty. 1473 self.assert_log_lines([]) 1474 1475 def test_config1_ok(self, config=config1): 1476 # A config file defining a sub-parser as well. 1477 with support.captured_stdout() as output: 1478 self.apply_config(config) 1479 logger = logging.getLogger("compiler.parser") 1480 # Both will output a message 1481 logger.info(self.next_message()) 1482 logger.error(self.next_message()) 1483 self.assert_log_lines([ 1484 ('INFO', '1'), 1485 ('ERROR', '2'), 1486 ], stream=output) 1487 # Original logger output is empty. 1488 self.assert_log_lines([]) 1489 1490 def test_config2_failure(self): 1491 # A simple config file which overrides the default settings. 1492 self.assertRaises(Exception, self.apply_config, self.config2) 1493 1494 def test_config3_failure(self): 1495 # A simple config file which overrides the default settings. 1496 self.assertRaises(Exception, self.apply_config, self.config3) 1497 1498 def test_config4_ok(self): 1499 # A config file specifying a custom formatter class. 1500 with support.captured_stdout() as output: 1501 self.apply_config(self.config4) 1502 logger = logging.getLogger() 1503 try: 1504 raise RuntimeError() 1505 except RuntimeError: 1506 logging.exception("just testing") 1507 sys.stdout.seek(0) 1508 self.assertEqual(output.getvalue(), 1509 "ERROR:root:just testing\nGot a [RuntimeError]\n") 1510 # Original logger output is empty 1511 self.assert_log_lines([]) 1512 1513 def test_config5_ok(self): 1514 self.test_config1_ok(config=self.config5) 1515 1516 def test_config6_ok(self): 1517 self.test_config1_ok(config=self.config6) 1518 1519 def test_config7_ok(self): 1520 with support.captured_stdout() as output: 1521 self.apply_config(self.config1a) 1522 logger = logging.getLogger("compiler.parser") 1523 # See issue #11424. compiler-hyphenated sorts 1524 # between compiler and compiler.xyz and this 1525 # was preventing compiler.xyz from being included 1526 # in the child loggers of compiler because of an 1527 # overzealous loop termination condition. 1528 hyphenated = logging.getLogger('compiler-hyphenated') 1529 # All will output a message 1530 logger.info(self.next_message()) 1531 logger.error(self.next_message()) 1532 hyphenated.critical(self.next_message()) 1533 self.assert_log_lines([ 1534 ('INFO', '1'), 1535 ('ERROR', '2'), 1536 ('CRITICAL', '3'), 1537 ], stream=output) 1538 # Original logger output is empty. 1539 self.assert_log_lines([]) 1540 with support.captured_stdout() as output: 1541 self.apply_config(self.config7) 1542 logger = logging.getLogger("compiler.parser") 1543 self.assertFalse(logger.disabled) 1544 # Both will output a message 1545 logger.info(self.next_message()) 1546 logger.error(self.next_message()) 1547 logger = logging.getLogger("compiler.lexer") 1548 # Both will output a message 1549 logger.info(self.next_message()) 1550 logger.error(self.next_message()) 1551 # Will not appear 1552 hyphenated.critical(self.next_message()) 1553 self.assert_log_lines([ 1554 ('INFO', '4'), 1555 ('ERROR', '5'), 1556 ('INFO', '6'), 1557 ('ERROR', '7'), 1558 ], stream=output) 1559 # Original logger output is empty. 1560 self.assert_log_lines([]) 1561 1562 def test_config8_ok(self): 1563 1564 def cleanup(h1, fn): 1565 h1.close() 1566 os.remove(fn) 1567 1568 with self.check_no_resource_warning(): 1569 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 1570 os.close(fd) 1571 1572 # Replace single backslash with double backslash in windows 1573 # to avoid unicode error during string formatting 1574 if os.name == "nt": 1575 fn = fn.replace("\\", "\\\\") 1576 1577 config8 = self.config8.format(tempfile=fn) 1578 1579 self.apply_config(config8) 1580 self.apply_config(config8) 1581 1582 handler = logging.root.handlers[0] 1583 self.addCleanup(cleanup, handler, fn) 1584 1585 def test_logger_disabling(self): 1586 self.apply_config(self.disable_test) 1587 logger = logging.getLogger('some_pristine_logger') 1588 self.assertFalse(logger.disabled) 1589 self.apply_config(self.disable_test) 1590 self.assertTrue(logger.disabled) 1591 self.apply_config(self.disable_test, disable_existing_loggers=False) 1592 self.assertFalse(logger.disabled) 1593 1594 def test_defaults_do_no_interpolation(self): 1595 """bpo-33802 defaults should not get interpolated""" 1596 ini = textwrap.dedent(""" 1597 [formatters] 1598 keys=default 1599 1600 [formatter_default] 1601 1602 [handlers] 1603 keys=console 1604 1605 [handler_console] 1606 class=logging.StreamHandler 1607 args=tuple() 1608 1609 [loggers] 1610 keys=root 1611 1612 [logger_root] 1613 formatter=default 1614 handlers=console 1615 """).strip() 1616 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini') 1617 try: 1618 os.write(fd, ini.encode('ascii')) 1619 os.close(fd) 1620 logging.config.fileConfig( 1621 fn, 1622 defaults=dict( 1623 version=1, 1624 disable_existing_loggers=False, 1625 formatters={ 1626 "generic": { 1627 "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s", 1628 "datefmt": "[%Y-%m-%d %H:%M:%S %z]", 1629 "class": "logging.Formatter" 1630 }, 1631 }, 1632 ) 1633 ) 1634 finally: 1635 os.unlink(fn) 1636 1637 1638class SocketHandlerTest(BaseTest): 1639 1640 """Test for SocketHandler objects.""" 1641 1642 server_class = TestTCPServer 1643 address = ('localhost', 0) 1644 1645 def setUp(self): 1646 """Set up a TCP server to receive log messages, and a SocketHandler 1647 pointing to that server's address and port.""" 1648 BaseTest.setUp(self) 1649 # Issue #29177: deal with errors that happen during setup 1650 self.server = self.sock_hdlr = self.server_exception = None 1651 try: 1652 self.server = server = self.server_class(self.address, 1653 self.handle_socket, 0.01) 1654 server.start() 1655 # Uncomment next line to test error recovery in setUp() 1656 # raise OSError('dummy error raised') 1657 except OSError as e: 1658 self.server_exception = e 1659 return 1660 server.ready.wait() 1661 hcls = logging.handlers.SocketHandler 1662 if isinstance(server.server_address, tuple): 1663 self.sock_hdlr = hcls('localhost', server.port) 1664 else: 1665 self.sock_hdlr = hcls(server.server_address, None) 1666 self.log_output = '' 1667 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1668 self.root_logger.addHandler(self.sock_hdlr) 1669 self.handled = threading.Semaphore(0) 1670 1671 def tearDown(self): 1672 """Shutdown the TCP server.""" 1673 try: 1674 if self.sock_hdlr: 1675 self.root_logger.removeHandler(self.sock_hdlr) 1676 self.sock_hdlr.close() 1677 if self.server: 1678 self.server.stop(2.0) 1679 finally: 1680 BaseTest.tearDown(self) 1681 1682 def handle_socket(self, request): 1683 conn = request.connection 1684 while True: 1685 chunk = conn.recv(4) 1686 if len(chunk) < 4: 1687 break 1688 slen = struct.unpack(">L", chunk)[0] 1689 chunk = conn.recv(slen) 1690 while len(chunk) < slen: 1691 chunk = chunk + conn.recv(slen - len(chunk)) 1692 obj = pickle.loads(chunk) 1693 record = logging.makeLogRecord(obj) 1694 self.log_output += record.msg + '\n' 1695 self.handled.release() 1696 1697 def test_output(self): 1698 # The log message sent to the SocketHandler is properly received. 1699 if self.server_exception: 1700 self.skipTest(self.server_exception) 1701 logger = logging.getLogger("tcp") 1702 logger.error("spam") 1703 self.handled.acquire() 1704 logger.debug("eggs") 1705 self.handled.acquire() 1706 self.assertEqual(self.log_output, "spam\neggs\n") 1707 1708 def test_noserver(self): 1709 if self.server_exception: 1710 self.skipTest(self.server_exception) 1711 # Avoid timing-related failures due to SocketHandler's own hard-wired 1712 # one-second timeout on socket.create_connection() (issue #16264). 1713 self.sock_hdlr.retryStart = 2.5 1714 # Kill the server 1715 self.server.stop(2.0) 1716 # The logging call should try to connect, which should fail 1717 try: 1718 raise RuntimeError('Deliberate mistake') 1719 except RuntimeError: 1720 self.root_logger.exception('Never sent') 1721 self.root_logger.error('Never sent, either') 1722 now = time.time() 1723 self.assertGreater(self.sock_hdlr.retryTime, now) 1724 time.sleep(self.sock_hdlr.retryTime - now + 0.001) 1725 self.root_logger.error('Nor this') 1726 1727def _get_temp_domain_socket(): 1728 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock') 1729 os.close(fd) 1730 # just need a name - file can't be present, or we'll get an 1731 # 'address already in use' error. 1732 os.remove(fn) 1733 return fn 1734 1735@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1736class UnixSocketHandlerTest(SocketHandlerTest): 1737 1738 """Test for SocketHandler with unix sockets.""" 1739 1740 if hasattr(socket, "AF_UNIX"): 1741 server_class = TestUnixStreamServer 1742 1743 def setUp(self): 1744 # override the definition in the base class 1745 self.address = _get_temp_domain_socket() 1746 SocketHandlerTest.setUp(self) 1747 1748 def tearDown(self): 1749 SocketHandlerTest.tearDown(self) 1750 support.unlink(self.address) 1751 1752class DatagramHandlerTest(BaseTest): 1753 1754 """Test for DatagramHandler.""" 1755 1756 server_class = TestUDPServer 1757 address = ('localhost', 0) 1758 1759 def setUp(self): 1760 """Set up a UDP server to receive log messages, and a DatagramHandler 1761 pointing to that server's address and port.""" 1762 BaseTest.setUp(self) 1763 # Issue #29177: deal with errors that happen during setup 1764 self.server = self.sock_hdlr = self.server_exception = None 1765 try: 1766 self.server = server = self.server_class(self.address, 1767 self.handle_datagram, 0.01) 1768 server.start() 1769 # Uncomment next line to test error recovery in setUp() 1770 # raise OSError('dummy error raised') 1771 except OSError as e: 1772 self.server_exception = e 1773 return 1774 server.ready.wait() 1775 hcls = logging.handlers.DatagramHandler 1776 if isinstance(server.server_address, tuple): 1777 self.sock_hdlr = hcls('localhost', server.port) 1778 else: 1779 self.sock_hdlr = hcls(server.server_address, None) 1780 self.log_output = '' 1781 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1782 self.root_logger.addHandler(self.sock_hdlr) 1783 self.handled = threading.Event() 1784 1785 def tearDown(self): 1786 """Shutdown the UDP server.""" 1787 try: 1788 if self.server: 1789 self.server.stop(2.0) 1790 if self.sock_hdlr: 1791 self.root_logger.removeHandler(self.sock_hdlr) 1792 self.sock_hdlr.close() 1793 finally: 1794 BaseTest.tearDown(self) 1795 1796 def handle_datagram(self, request): 1797 slen = struct.pack('>L', 0) # length of prefix 1798 packet = request.packet[len(slen):] 1799 obj = pickle.loads(packet) 1800 record = logging.makeLogRecord(obj) 1801 self.log_output += record.msg + '\n' 1802 self.handled.set() 1803 1804 def test_output(self): 1805 # The log message sent to the DatagramHandler is properly received. 1806 if self.server_exception: 1807 self.skipTest(self.server_exception) 1808 logger = logging.getLogger("udp") 1809 logger.error("spam") 1810 self.handled.wait() 1811 self.handled.clear() 1812 logger.error("eggs") 1813 self.handled.wait() 1814 self.assertEqual(self.log_output, "spam\neggs\n") 1815 1816@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1817class UnixDatagramHandlerTest(DatagramHandlerTest): 1818 1819 """Test for DatagramHandler using Unix sockets.""" 1820 1821 if hasattr(socket, "AF_UNIX"): 1822 server_class = TestUnixDatagramServer 1823 1824 def setUp(self): 1825 # override the definition in the base class 1826 self.address = _get_temp_domain_socket() 1827 DatagramHandlerTest.setUp(self) 1828 1829 def tearDown(self): 1830 DatagramHandlerTest.tearDown(self) 1831 support.unlink(self.address) 1832 1833class SysLogHandlerTest(BaseTest): 1834 1835 """Test for SysLogHandler using UDP.""" 1836 1837 server_class = TestUDPServer 1838 address = ('localhost', 0) 1839 1840 def setUp(self): 1841 """Set up a UDP server to receive log messages, and a SysLogHandler 1842 pointing to that server's address and port.""" 1843 BaseTest.setUp(self) 1844 # Issue #29177: deal with errors that happen during setup 1845 self.server = self.sl_hdlr = self.server_exception = None 1846 try: 1847 self.server = server = self.server_class(self.address, 1848 self.handle_datagram, 0.01) 1849 server.start() 1850 # Uncomment next line to test error recovery in setUp() 1851 # raise OSError('dummy error raised') 1852 except OSError as e: 1853 self.server_exception = e 1854 return 1855 server.ready.wait() 1856 hcls = logging.handlers.SysLogHandler 1857 if isinstance(server.server_address, tuple): 1858 self.sl_hdlr = hcls((server.server_address[0], server.port)) 1859 else: 1860 self.sl_hdlr = hcls(server.server_address) 1861 self.log_output = '' 1862 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1863 self.root_logger.addHandler(self.sl_hdlr) 1864 self.handled = threading.Event() 1865 1866 def tearDown(self): 1867 """Shutdown the server.""" 1868 try: 1869 if self.server: 1870 self.server.stop(2.0) 1871 if self.sl_hdlr: 1872 self.root_logger.removeHandler(self.sl_hdlr) 1873 self.sl_hdlr.close() 1874 finally: 1875 BaseTest.tearDown(self) 1876 1877 def handle_datagram(self, request): 1878 self.log_output = request.packet 1879 self.handled.set() 1880 1881 def test_output(self): 1882 if self.server_exception: 1883 self.skipTest(self.server_exception) 1884 # The log message sent to the SysLogHandler is properly received. 1885 logger = logging.getLogger("slh") 1886 logger.error("sp\xe4m") 1887 self.handled.wait() 1888 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 1889 self.handled.clear() 1890 self.sl_hdlr.append_nul = False 1891 logger.error("sp\xe4m") 1892 self.handled.wait() 1893 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m') 1894 self.handled.clear() 1895 self.sl_hdlr.ident = "h\xe4m-" 1896 logger.error("sp\xe4m") 1897 self.handled.wait() 1898 self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') 1899 1900@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1901class UnixSysLogHandlerTest(SysLogHandlerTest): 1902 1903 """Test for SysLogHandler with Unix sockets.""" 1904 1905 if hasattr(socket, "AF_UNIX"): 1906 server_class = TestUnixDatagramServer 1907 1908 def setUp(self): 1909 # override the definition in the base class 1910 self.address = _get_temp_domain_socket() 1911 SysLogHandlerTest.setUp(self) 1912 1913 def tearDown(self): 1914 SysLogHandlerTest.tearDown(self) 1915 support.unlink(self.address) 1916 1917@unittest.skipUnless(support.IPV6_ENABLED, 1918 'IPv6 support required for this test.') 1919class IPv6SysLogHandlerTest(SysLogHandlerTest): 1920 1921 """Test for SysLogHandler with IPv6 host.""" 1922 1923 server_class = TestUDPServer 1924 address = ('::1', 0) 1925 1926 def setUp(self): 1927 self.server_class.address_family = socket.AF_INET6 1928 super(IPv6SysLogHandlerTest, self).setUp() 1929 1930 def tearDown(self): 1931 self.server_class.address_family = socket.AF_INET 1932 super(IPv6SysLogHandlerTest, self).tearDown() 1933 1934class HTTPHandlerTest(BaseTest): 1935 """Test for HTTPHandler.""" 1936 1937 def setUp(self): 1938 """Set up an HTTP server to receive log messages, and a HTTPHandler 1939 pointing to that server's address and port.""" 1940 BaseTest.setUp(self) 1941 self.handled = threading.Event() 1942 1943 def handle_request(self, request): 1944 self.command = request.command 1945 self.log_data = urlparse(request.path) 1946 if self.command == 'POST': 1947 try: 1948 rlen = int(request.headers['Content-Length']) 1949 self.post_data = request.rfile.read(rlen) 1950 except: 1951 self.post_data = None 1952 request.send_response(200) 1953 request.end_headers() 1954 self.handled.set() 1955 1956 def test_output(self): 1957 # The log message sent to the HTTPHandler is properly received. 1958 logger = logging.getLogger("http") 1959 root_logger = self.root_logger 1960 root_logger.removeHandler(self.root_logger.handlers[0]) 1961 for secure in (False, True): 1962 addr = ('localhost', 0) 1963 if secure: 1964 try: 1965 import ssl 1966 except ImportError: 1967 sslctx = None 1968 else: 1969 here = os.path.dirname(__file__) 1970 localhost_cert = os.path.join(here, "keycert.pem") 1971 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 1972 sslctx.load_cert_chain(localhost_cert) 1973 1974 context = ssl.create_default_context(cafile=localhost_cert) 1975 else: 1976 sslctx = None 1977 context = None 1978 self.server = server = TestHTTPServer(addr, self.handle_request, 1979 0.01, sslctx=sslctx) 1980 server.start() 1981 server.ready.wait() 1982 host = 'localhost:%d' % server.server_port 1983 secure_client = secure and sslctx 1984 self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', 1985 secure=secure_client, 1986 context=context, 1987 credentials=('foo', 'bar')) 1988 self.log_data = None 1989 root_logger.addHandler(self.h_hdlr) 1990 1991 for method in ('GET', 'POST'): 1992 self.h_hdlr.method = method 1993 self.handled.clear() 1994 msg = "sp\xe4m" 1995 logger.error(msg) 1996 self.handled.wait() 1997 self.assertEqual(self.log_data.path, '/frob') 1998 self.assertEqual(self.command, method) 1999 if method == 'GET': 2000 d = parse_qs(self.log_data.query) 2001 else: 2002 d = parse_qs(self.post_data.decode('utf-8')) 2003 self.assertEqual(d['name'], ['http']) 2004 self.assertEqual(d['funcName'], ['test_output']) 2005 self.assertEqual(d['msg'], [msg]) 2006 2007 self.server.stop(2.0) 2008 self.root_logger.removeHandler(self.h_hdlr) 2009 self.h_hdlr.close() 2010 2011class MemoryTest(BaseTest): 2012 2013 """Test memory persistence of logger objects.""" 2014 2015 def setUp(self): 2016 """Create a dict to remember potentially destroyed objects.""" 2017 BaseTest.setUp(self) 2018 self._survivors = {} 2019 2020 def _watch_for_survival(self, *args): 2021 """Watch the given objects for survival, by creating weakrefs to 2022 them.""" 2023 for obj in args: 2024 key = id(obj), repr(obj) 2025 self._survivors[key] = weakref.ref(obj) 2026 2027 def _assertTruesurvival(self): 2028 """Assert that all objects watched for survival have survived.""" 2029 # Trigger cycle breaking. 2030 gc.collect() 2031 dead = [] 2032 for (id_, repr_), ref in self._survivors.items(): 2033 if ref() is None: 2034 dead.append(repr_) 2035 if dead: 2036 self.fail("%d objects should have survived " 2037 "but have been destroyed: %s" % (len(dead), ", ".join(dead))) 2038 2039 def test_persistent_loggers(self): 2040 # Logger objects are persistent and retain their configuration, even 2041 # if visible references are destroyed. 2042 self.root_logger.setLevel(logging.INFO) 2043 foo = logging.getLogger("foo") 2044 self._watch_for_survival(foo) 2045 foo.setLevel(logging.DEBUG) 2046 self.root_logger.debug(self.next_message()) 2047 foo.debug(self.next_message()) 2048 self.assert_log_lines([ 2049 ('foo', 'DEBUG', '2'), 2050 ]) 2051 del foo 2052 # foo has survived. 2053 self._assertTruesurvival() 2054 # foo has retained its settings. 2055 bar = logging.getLogger("foo") 2056 bar.debug(self.next_message()) 2057 self.assert_log_lines([ 2058 ('foo', 'DEBUG', '2'), 2059 ('foo', 'DEBUG', '3'), 2060 ]) 2061 2062 2063class EncodingTest(BaseTest): 2064 def test_encoding_plain_file(self): 2065 # In Python 2.x, a plain file object is treated as having no encoding. 2066 log = logging.getLogger("test") 2067 fd, fn = tempfile.mkstemp(".log", "test_logging-1-") 2068 os.close(fd) 2069 # the non-ascii data we write to the log. 2070 data = "foo\x80" 2071 try: 2072 handler = logging.FileHandler(fn, encoding="utf-8") 2073 log.addHandler(handler) 2074 try: 2075 # write non-ascii data to the log. 2076 log.warning(data) 2077 finally: 2078 log.removeHandler(handler) 2079 handler.close() 2080 # check we wrote exactly those bytes, ignoring trailing \n etc 2081 f = open(fn, encoding="utf-8") 2082 try: 2083 self.assertEqual(f.read().rstrip(), data) 2084 finally: 2085 f.close() 2086 finally: 2087 if os.path.isfile(fn): 2088 os.remove(fn) 2089 2090 def test_encoding_cyrillic_unicode(self): 2091 log = logging.getLogger("test") 2092 # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) 2093 message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' 2094 # Ensure it's written in a Cyrillic encoding 2095 writer_class = codecs.getwriter('cp1251') 2096 writer_class.encoding = 'cp1251' 2097 stream = io.BytesIO() 2098 writer = writer_class(stream, 'strict') 2099 handler = logging.StreamHandler(writer) 2100 log.addHandler(handler) 2101 try: 2102 log.warning(message) 2103 finally: 2104 log.removeHandler(handler) 2105 handler.close() 2106 # check we wrote exactly those bytes, ignoring trailing \n etc 2107 s = stream.getvalue() 2108 # Compare against what the data should be when encoded in CP-1251 2109 self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') 2110 2111 2112class WarningsTest(BaseTest): 2113 2114 def test_warnings(self): 2115 with warnings.catch_warnings(): 2116 logging.captureWarnings(True) 2117 self.addCleanup(logging.captureWarnings, False) 2118 warnings.filterwarnings("always", category=UserWarning) 2119 stream = io.StringIO() 2120 h = logging.StreamHandler(stream) 2121 logger = logging.getLogger("py.warnings") 2122 logger.addHandler(h) 2123 warnings.warn("I'm warning you...") 2124 logger.removeHandler(h) 2125 s = stream.getvalue() 2126 h.close() 2127 self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) 2128 2129 # See if an explicit file uses the original implementation 2130 a_file = io.StringIO() 2131 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, 2132 a_file, "Dummy line") 2133 s = a_file.getvalue() 2134 a_file.close() 2135 self.assertEqual(s, 2136 "dummy.py:42: UserWarning: Explicit\n Dummy line\n") 2137 2138 def test_warnings_no_handlers(self): 2139 with warnings.catch_warnings(): 2140 logging.captureWarnings(True) 2141 self.addCleanup(logging.captureWarnings, False) 2142 2143 # confirm our assumption: no loggers are set 2144 logger = logging.getLogger("py.warnings") 2145 self.assertEqual(logger.handlers, []) 2146 2147 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42) 2148 self.assertEqual(len(logger.handlers), 1) 2149 self.assertIsInstance(logger.handlers[0], logging.NullHandler) 2150 2151 2152def formatFunc(format, datefmt=None): 2153 return logging.Formatter(format, datefmt) 2154 2155class myCustomFormatter: 2156 def __init__(self, fmt, datefmt=None): 2157 pass 2158 2159def handlerFunc(): 2160 return logging.StreamHandler() 2161 2162class CustomHandler(logging.StreamHandler): 2163 pass 2164 2165class ConfigDictTest(BaseTest): 2166 2167 """Reading logging config from a dictionary.""" 2168 2169 check_no_resource_warning = support.check_no_resource_warning 2170 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 2171 2172 # config0 is a standard configuration. 2173 config0 = { 2174 'version': 1, 2175 'formatters': { 2176 'form1' : { 2177 'format' : '%(levelname)s ++ %(message)s', 2178 }, 2179 }, 2180 'handlers' : { 2181 'hand1' : { 2182 'class' : 'logging.StreamHandler', 2183 'formatter' : 'form1', 2184 'level' : 'NOTSET', 2185 'stream' : 'ext://sys.stdout', 2186 }, 2187 }, 2188 'root' : { 2189 'level' : 'WARNING', 2190 'handlers' : ['hand1'], 2191 }, 2192 } 2193 2194 # config1 adds a little to the standard configuration. 2195 config1 = { 2196 'version': 1, 2197 'formatters': { 2198 'form1' : { 2199 'format' : '%(levelname)s ++ %(message)s', 2200 }, 2201 }, 2202 'handlers' : { 2203 'hand1' : { 2204 'class' : 'logging.StreamHandler', 2205 'formatter' : 'form1', 2206 'level' : 'NOTSET', 2207 'stream' : 'ext://sys.stdout', 2208 }, 2209 }, 2210 'loggers' : { 2211 'compiler.parser' : { 2212 'level' : 'DEBUG', 2213 'handlers' : ['hand1'], 2214 }, 2215 }, 2216 'root' : { 2217 'level' : 'WARNING', 2218 }, 2219 } 2220 2221 # config1a moves the handler to the root. Used with config8a 2222 config1a = { 2223 'version': 1, 2224 'formatters': { 2225 'form1' : { 2226 'format' : '%(levelname)s ++ %(message)s', 2227 }, 2228 }, 2229 'handlers' : { 2230 'hand1' : { 2231 'class' : 'logging.StreamHandler', 2232 'formatter' : 'form1', 2233 'level' : 'NOTSET', 2234 'stream' : 'ext://sys.stdout', 2235 }, 2236 }, 2237 'loggers' : { 2238 'compiler.parser' : { 2239 'level' : 'DEBUG', 2240 }, 2241 }, 2242 'root' : { 2243 'level' : 'WARNING', 2244 'handlers' : ['hand1'], 2245 }, 2246 } 2247 2248 # config2 has a subtle configuration error that should be reported 2249 config2 = { 2250 'version': 1, 2251 'formatters': { 2252 'form1' : { 2253 'format' : '%(levelname)s ++ %(message)s', 2254 }, 2255 }, 2256 'handlers' : { 2257 'hand1' : { 2258 'class' : 'logging.StreamHandler', 2259 'formatter' : 'form1', 2260 'level' : 'NOTSET', 2261 'stream' : 'ext://sys.stdbout', 2262 }, 2263 }, 2264 'loggers' : { 2265 'compiler.parser' : { 2266 'level' : 'DEBUG', 2267 'handlers' : ['hand1'], 2268 }, 2269 }, 2270 'root' : { 2271 'level' : 'WARNING', 2272 }, 2273 } 2274 2275 # As config1 but with a misspelt level on a handler 2276 config2a = { 2277 'version': 1, 2278 'formatters': { 2279 'form1' : { 2280 'format' : '%(levelname)s ++ %(message)s', 2281 }, 2282 }, 2283 'handlers' : { 2284 'hand1' : { 2285 'class' : 'logging.StreamHandler', 2286 'formatter' : 'form1', 2287 'level' : 'NTOSET', 2288 'stream' : 'ext://sys.stdout', 2289 }, 2290 }, 2291 'loggers' : { 2292 'compiler.parser' : { 2293 'level' : 'DEBUG', 2294 'handlers' : ['hand1'], 2295 }, 2296 }, 2297 'root' : { 2298 'level' : 'WARNING', 2299 }, 2300 } 2301 2302 2303 # As config1 but with a misspelt level on a logger 2304 config2b = { 2305 'version': 1, 2306 'formatters': { 2307 'form1' : { 2308 'format' : '%(levelname)s ++ %(message)s', 2309 }, 2310 }, 2311 'handlers' : { 2312 'hand1' : { 2313 'class' : 'logging.StreamHandler', 2314 'formatter' : 'form1', 2315 'level' : 'NOTSET', 2316 'stream' : 'ext://sys.stdout', 2317 }, 2318 }, 2319 'loggers' : { 2320 'compiler.parser' : { 2321 'level' : 'DEBUG', 2322 'handlers' : ['hand1'], 2323 }, 2324 }, 2325 'root' : { 2326 'level' : 'WRANING', 2327 }, 2328 } 2329 2330 # config3 has a less subtle configuration error 2331 config3 = { 2332 'version': 1, 2333 'formatters': { 2334 'form1' : { 2335 'format' : '%(levelname)s ++ %(message)s', 2336 }, 2337 }, 2338 'handlers' : { 2339 'hand1' : { 2340 'class' : 'logging.StreamHandler', 2341 'formatter' : 'misspelled_name', 2342 'level' : 'NOTSET', 2343 'stream' : 'ext://sys.stdout', 2344 }, 2345 }, 2346 'loggers' : { 2347 'compiler.parser' : { 2348 'level' : 'DEBUG', 2349 'handlers' : ['hand1'], 2350 }, 2351 }, 2352 'root' : { 2353 'level' : 'WARNING', 2354 }, 2355 } 2356 2357 # config4 specifies a custom formatter class to be loaded 2358 config4 = { 2359 'version': 1, 2360 'formatters': { 2361 'form1' : { 2362 '()' : __name__ + '.ExceptionFormatter', 2363 'format' : '%(levelname)s:%(name)s:%(message)s', 2364 }, 2365 }, 2366 'handlers' : { 2367 'hand1' : { 2368 'class' : 'logging.StreamHandler', 2369 'formatter' : 'form1', 2370 'level' : 'NOTSET', 2371 'stream' : 'ext://sys.stdout', 2372 }, 2373 }, 2374 'root' : { 2375 'level' : 'NOTSET', 2376 'handlers' : ['hand1'], 2377 }, 2378 } 2379 2380 # As config4 but using an actual callable rather than a string 2381 config4a = { 2382 'version': 1, 2383 'formatters': { 2384 'form1' : { 2385 '()' : ExceptionFormatter, 2386 'format' : '%(levelname)s:%(name)s:%(message)s', 2387 }, 2388 'form2' : { 2389 '()' : __name__ + '.formatFunc', 2390 'format' : '%(levelname)s:%(name)s:%(message)s', 2391 }, 2392 'form3' : { 2393 '()' : formatFunc, 2394 'format' : '%(levelname)s:%(name)s:%(message)s', 2395 }, 2396 }, 2397 'handlers' : { 2398 'hand1' : { 2399 'class' : 'logging.StreamHandler', 2400 'formatter' : 'form1', 2401 'level' : 'NOTSET', 2402 'stream' : 'ext://sys.stdout', 2403 }, 2404 'hand2' : { 2405 '()' : handlerFunc, 2406 }, 2407 }, 2408 'root' : { 2409 'level' : 'NOTSET', 2410 'handlers' : ['hand1'], 2411 }, 2412 } 2413 2414 # config5 specifies a custom handler class to be loaded 2415 config5 = { 2416 'version': 1, 2417 'formatters': { 2418 'form1' : { 2419 'format' : '%(levelname)s ++ %(message)s', 2420 }, 2421 }, 2422 'handlers' : { 2423 'hand1' : { 2424 'class' : __name__ + '.CustomHandler', 2425 'formatter' : 'form1', 2426 'level' : 'NOTSET', 2427 'stream' : 'ext://sys.stdout', 2428 }, 2429 }, 2430 'loggers' : { 2431 'compiler.parser' : { 2432 'level' : 'DEBUG', 2433 'handlers' : ['hand1'], 2434 }, 2435 }, 2436 'root' : { 2437 'level' : 'WARNING', 2438 }, 2439 } 2440 2441 # config6 specifies a custom handler class to be loaded 2442 # but has bad arguments 2443 config6 = { 2444 'version': 1, 2445 'formatters': { 2446 'form1' : { 2447 'format' : '%(levelname)s ++ %(message)s', 2448 }, 2449 }, 2450 'handlers' : { 2451 'hand1' : { 2452 'class' : __name__ + '.CustomHandler', 2453 'formatter' : 'form1', 2454 'level' : 'NOTSET', 2455 'stream' : 'ext://sys.stdout', 2456 '9' : 'invalid parameter name', 2457 }, 2458 }, 2459 'loggers' : { 2460 'compiler.parser' : { 2461 'level' : 'DEBUG', 2462 'handlers' : ['hand1'], 2463 }, 2464 }, 2465 'root' : { 2466 'level' : 'WARNING', 2467 }, 2468 } 2469 2470 # config 7 does not define compiler.parser but defines compiler.lexer 2471 # so compiler.parser should be disabled after applying it 2472 config7 = { 2473 'version': 1, 2474 'formatters': { 2475 'form1' : { 2476 'format' : '%(levelname)s ++ %(message)s', 2477 }, 2478 }, 2479 'handlers' : { 2480 'hand1' : { 2481 'class' : 'logging.StreamHandler', 2482 'formatter' : 'form1', 2483 'level' : 'NOTSET', 2484 'stream' : 'ext://sys.stdout', 2485 }, 2486 }, 2487 'loggers' : { 2488 'compiler.lexer' : { 2489 'level' : 'DEBUG', 2490 'handlers' : ['hand1'], 2491 }, 2492 }, 2493 'root' : { 2494 'level' : 'WARNING', 2495 }, 2496 } 2497 2498 # config8 defines both compiler and compiler.lexer 2499 # so compiler.parser should not be disabled (since 2500 # compiler is defined) 2501 config8 = { 2502 'version': 1, 2503 'disable_existing_loggers' : False, 2504 'formatters': { 2505 'form1' : { 2506 'format' : '%(levelname)s ++ %(message)s', 2507 }, 2508 }, 2509 'handlers' : { 2510 'hand1' : { 2511 'class' : 'logging.StreamHandler', 2512 'formatter' : 'form1', 2513 'level' : 'NOTSET', 2514 'stream' : 'ext://sys.stdout', 2515 }, 2516 }, 2517 'loggers' : { 2518 'compiler' : { 2519 'level' : 'DEBUG', 2520 'handlers' : ['hand1'], 2521 }, 2522 'compiler.lexer' : { 2523 }, 2524 }, 2525 'root' : { 2526 'level' : 'WARNING', 2527 }, 2528 } 2529 2530 # config8a disables existing loggers 2531 config8a = { 2532 'version': 1, 2533 'disable_existing_loggers' : True, 2534 'formatters': { 2535 'form1' : { 2536 'format' : '%(levelname)s ++ %(message)s', 2537 }, 2538 }, 2539 'handlers' : { 2540 'hand1' : { 2541 'class' : 'logging.StreamHandler', 2542 'formatter' : 'form1', 2543 'level' : 'NOTSET', 2544 'stream' : 'ext://sys.stdout', 2545 }, 2546 }, 2547 'loggers' : { 2548 'compiler' : { 2549 'level' : 'DEBUG', 2550 'handlers' : ['hand1'], 2551 }, 2552 'compiler.lexer' : { 2553 }, 2554 }, 2555 'root' : { 2556 'level' : 'WARNING', 2557 }, 2558 } 2559 2560 config9 = { 2561 'version': 1, 2562 'formatters': { 2563 'form1' : { 2564 'format' : '%(levelname)s ++ %(message)s', 2565 }, 2566 }, 2567 'handlers' : { 2568 'hand1' : { 2569 'class' : 'logging.StreamHandler', 2570 'formatter' : 'form1', 2571 'level' : 'WARNING', 2572 'stream' : 'ext://sys.stdout', 2573 }, 2574 }, 2575 'loggers' : { 2576 'compiler.parser' : { 2577 'level' : 'WARNING', 2578 'handlers' : ['hand1'], 2579 }, 2580 }, 2581 'root' : { 2582 'level' : 'NOTSET', 2583 }, 2584 } 2585 2586 config9a = { 2587 'version': 1, 2588 'incremental' : True, 2589 'handlers' : { 2590 'hand1' : { 2591 'level' : 'WARNING', 2592 }, 2593 }, 2594 'loggers' : { 2595 'compiler.parser' : { 2596 'level' : 'INFO', 2597 }, 2598 }, 2599 } 2600 2601 config9b = { 2602 'version': 1, 2603 'incremental' : True, 2604 'handlers' : { 2605 'hand1' : { 2606 'level' : 'INFO', 2607 }, 2608 }, 2609 'loggers' : { 2610 'compiler.parser' : { 2611 'level' : 'INFO', 2612 }, 2613 }, 2614 } 2615 2616 # As config1 but with a filter added 2617 config10 = { 2618 'version': 1, 2619 'formatters': { 2620 'form1' : { 2621 'format' : '%(levelname)s ++ %(message)s', 2622 }, 2623 }, 2624 'filters' : { 2625 'filt1' : { 2626 'name' : 'compiler.parser', 2627 }, 2628 }, 2629 'handlers' : { 2630 'hand1' : { 2631 'class' : 'logging.StreamHandler', 2632 'formatter' : 'form1', 2633 'level' : 'NOTSET', 2634 'stream' : 'ext://sys.stdout', 2635 'filters' : ['filt1'], 2636 }, 2637 }, 2638 'loggers' : { 2639 'compiler.parser' : { 2640 'level' : 'DEBUG', 2641 'filters' : ['filt1'], 2642 }, 2643 }, 2644 'root' : { 2645 'level' : 'WARNING', 2646 'handlers' : ['hand1'], 2647 }, 2648 } 2649 2650 # As config1 but using cfg:// references 2651 config11 = { 2652 'version': 1, 2653 'true_formatters': { 2654 'form1' : { 2655 'format' : '%(levelname)s ++ %(message)s', 2656 }, 2657 }, 2658 'handler_configs': { 2659 'hand1' : { 2660 'class' : 'logging.StreamHandler', 2661 'formatter' : 'form1', 2662 'level' : 'NOTSET', 2663 'stream' : 'ext://sys.stdout', 2664 }, 2665 }, 2666 'formatters' : 'cfg://true_formatters', 2667 'handlers' : { 2668 'hand1' : 'cfg://handler_configs[hand1]', 2669 }, 2670 'loggers' : { 2671 'compiler.parser' : { 2672 'level' : 'DEBUG', 2673 'handlers' : ['hand1'], 2674 }, 2675 }, 2676 'root' : { 2677 'level' : 'WARNING', 2678 }, 2679 } 2680 2681 # As config11 but missing the version key 2682 config12 = { 2683 'true_formatters': { 2684 'form1' : { 2685 'format' : '%(levelname)s ++ %(message)s', 2686 }, 2687 }, 2688 'handler_configs': { 2689 'hand1' : { 2690 'class' : 'logging.StreamHandler', 2691 'formatter' : 'form1', 2692 'level' : 'NOTSET', 2693 'stream' : 'ext://sys.stdout', 2694 }, 2695 }, 2696 'formatters' : 'cfg://true_formatters', 2697 'handlers' : { 2698 'hand1' : 'cfg://handler_configs[hand1]', 2699 }, 2700 'loggers' : { 2701 'compiler.parser' : { 2702 'level' : 'DEBUG', 2703 'handlers' : ['hand1'], 2704 }, 2705 }, 2706 'root' : { 2707 'level' : 'WARNING', 2708 }, 2709 } 2710 2711 # As config11 but using an unsupported version 2712 config13 = { 2713 'version': 2, 2714 'true_formatters': { 2715 'form1' : { 2716 'format' : '%(levelname)s ++ %(message)s', 2717 }, 2718 }, 2719 'handler_configs': { 2720 'hand1' : { 2721 'class' : 'logging.StreamHandler', 2722 'formatter' : 'form1', 2723 'level' : 'NOTSET', 2724 'stream' : 'ext://sys.stdout', 2725 }, 2726 }, 2727 'formatters' : 'cfg://true_formatters', 2728 'handlers' : { 2729 'hand1' : 'cfg://handler_configs[hand1]', 2730 }, 2731 'loggers' : { 2732 'compiler.parser' : { 2733 'level' : 'DEBUG', 2734 'handlers' : ['hand1'], 2735 }, 2736 }, 2737 'root' : { 2738 'level' : 'WARNING', 2739 }, 2740 } 2741 2742 # As config0, but with properties 2743 config14 = { 2744 'version': 1, 2745 'formatters': { 2746 'form1' : { 2747 'format' : '%(levelname)s ++ %(message)s', 2748 }, 2749 }, 2750 'handlers' : { 2751 'hand1' : { 2752 'class' : 'logging.StreamHandler', 2753 'formatter' : 'form1', 2754 'level' : 'NOTSET', 2755 'stream' : 'ext://sys.stdout', 2756 '.': { 2757 'foo': 'bar', 2758 'terminator': '!\n', 2759 } 2760 }, 2761 }, 2762 'root' : { 2763 'level' : 'WARNING', 2764 'handlers' : ['hand1'], 2765 }, 2766 } 2767 2768 out_of_order = { 2769 "version": 1, 2770 "formatters": { 2771 "mySimpleFormatter": { 2772 "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s", 2773 "style": "$" 2774 } 2775 }, 2776 "handlers": { 2777 "fileGlobal": { 2778 "class": "logging.StreamHandler", 2779 "level": "DEBUG", 2780 "formatter": "mySimpleFormatter" 2781 }, 2782 "bufferGlobal": { 2783 "class": "logging.handlers.MemoryHandler", 2784 "capacity": 5, 2785 "formatter": "mySimpleFormatter", 2786 "target": "fileGlobal", 2787 "level": "DEBUG" 2788 } 2789 }, 2790 "loggers": { 2791 "mymodule": { 2792 "level": "DEBUG", 2793 "handlers": ["bufferGlobal"], 2794 "propagate": "true" 2795 } 2796 } 2797 } 2798 2799 # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False 2800 custom_formatter_class_validate = { 2801 'version': 1, 2802 'formatters': { 2803 'form1': { 2804 '()': __name__ + '.ExceptionFormatter', 2805 'format': '%(levelname)s:%(name)s:%(message)s', 2806 'validate': False, 2807 }, 2808 }, 2809 'handlers' : { 2810 'hand1' : { 2811 'class': 'logging.StreamHandler', 2812 'formatter': 'form1', 2813 'level': 'NOTSET', 2814 'stream': 'ext://sys.stdout', 2815 }, 2816 }, 2817 "loggers": { 2818 "my_test_logger_custom_formatter": { 2819 "level": "DEBUG", 2820 "handlers": ["hand1"], 2821 "propagate": "true" 2822 } 2823 } 2824 } 2825 2826 # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False 2827 custom_formatter_class_validate2 = { 2828 'version': 1, 2829 'formatters': { 2830 'form1': { 2831 'class': __name__ + '.ExceptionFormatter', 2832 'format': '%(levelname)s:%(name)s:%(message)s', 2833 'validate': False, 2834 }, 2835 }, 2836 'handlers' : { 2837 'hand1' : { 2838 'class': 'logging.StreamHandler', 2839 'formatter': 'form1', 2840 'level': 'NOTSET', 2841 'stream': 'ext://sys.stdout', 2842 }, 2843 }, 2844 "loggers": { 2845 "my_test_logger_custom_formatter": { 2846 "level": "DEBUG", 2847 "handlers": ["hand1"], 2848 "propagate": "true" 2849 } 2850 } 2851 } 2852 2853 # Configuration with custom class that is not inherited from logging.Formatter 2854 custom_formatter_class_validate3 = { 2855 'version': 1, 2856 'formatters': { 2857 'form1': { 2858 'class': __name__ + '.myCustomFormatter', 2859 'format': '%(levelname)s:%(name)s:%(message)s', 2860 'validate': False, 2861 }, 2862 }, 2863 'handlers' : { 2864 'hand1' : { 2865 'class': 'logging.StreamHandler', 2866 'formatter': 'form1', 2867 'level': 'NOTSET', 2868 'stream': 'ext://sys.stdout', 2869 }, 2870 }, 2871 "loggers": { 2872 "my_test_logger_custom_formatter": { 2873 "level": "DEBUG", 2874 "handlers": ["hand1"], 2875 "propagate": "true" 2876 } 2877 } 2878 } 2879 2880 # Configuration with custom function and 'validate' set to False 2881 custom_formatter_with_function = { 2882 'version': 1, 2883 'formatters': { 2884 'form1': { 2885 '()': formatFunc, 2886 'format': '%(levelname)s:%(name)s:%(message)s', 2887 'validate': False, 2888 }, 2889 }, 2890 'handlers' : { 2891 'hand1' : { 2892 'class': 'logging.StreamHandler', 2893 'formatter': 'form1', 2894 'level': 'NOTSET', 2895 'stream': 'ext://sys.stdout', 2896 }, 2897 }, 2898 "loggers": { 2899 "my_test_logger_custom_formatter": { 2900 "level": "DEBUG", 2901 "handlers": ["hand1"], 2902 "propagate": "true" 2903 } 2904 } 2905 } 2906 2907 def apply_config(self, conf): 2908 logging.config.dictConfig(conf) 2909 2910 def test_config0_ok(self): 2911 # A simple config which overrides the default settings. 2912 with support.captured_stdout() as output: 2913 self.apply_config(self.config0) 2914 logger = logging.getLogger() 2915 # Won't output anything 2916 logger.info(self.next_message()) 2917 # Outputs a message 2918 logger.error(self.next_message()) 2919 self.assert_log_lines([ 2920 ('ERROR', '2'), 2921 ], stream=output) 2922 # Original logger output is empty. 2923 self.assert_log_lines([]) 2924 2925 def test_config1_ok(self, config=config1): 2926 # A config defining a sub-parser as well. 2927 with support.captured_stdout() as output: 2928 self.apply_config(config) 2929 logger = logging.getLogger("compiler.parser") 2930 # Both will output a message 2931 logger.info(self.next_message()) 2932 logger.error(self.next_message()) 2933 self.assert_log_lines([ 2934 ('INFO', '1'), 2935 ('ERROR', '2'), 2936 ], stream=output) 2937 # Original logger output is empty. 2938 self.assert_log_lines([]) 2939 2940 def test_config2_failure(self): 2941 # A simple config which overrides the default settings. 2942 self.assertRaises(Exception, self.apply_config, self.config2) 2943 2944 def test_config2a_failure(self): 2945 # A simple config which overrides the default settings. 2946 self.assertRaises(Exception, self.apply_config, self.config2a) 2947 2948 def test_config2b_failure(self): 2949 # A simple config which overrides the default settings. 2950 self.assertRaises(Exception, self.apply_config, self.config2b) 2951 2952 def test_config3_failure(self): 2953 # A simple config which overrides the default settings. 2954 self.assertRaises(Exception, self.apply_config, self.config3) 2955 2956 def test_config4_ok(self): 2957 # A config specifying a custom formatter class. 2958 with support.captured_stdout() as output: 2959 self.apply_config(self.config4) 2960 #logger = logging.getLogger() 2961 try: 2962 raise RuntimeError() 2963 except RuntimeError: 2964 logging.exception("just testing") 2965 sys.stdout.seek(0) 2966 self.assertEqual(output.getvalue(), 2967 "ERROR:root:just testing\nGot a [RuntimeError]\n") 2968 # Original logger output is empty 2969 self.assert_log_lines([]) 2970 2971 def test_config4a_ok(self): 2972 # A config specifying a custom formatter class. 2973 with support.captured_stdout() as output: 2974 self.apply_config(self.config4a) 2975 #logger = logging.getLogger() 2976 try: 2977 raise RuntimeError() 2978 except RuntimeError: 2979 logging.exception("just testing") 2980 sys.stdout.seek(0) 2981 self.assertEqual(output.getvalue(), 2982 "ERROR:root:just testing\nGot a [RuntimeError]\n") 2983 # Original logger output is empty 2984 self.assert_log_lines([]) 2985 2986 def test_config5_ok(self): 2987 self.test_config1_ok(config=self.config5) 2988 2989 def test_config6_failure(self): 2990 self.assertRaises(Exception, self.apply_config, self.config6) 2991 2992 def test_config7_ok(self): 2993 with support.captured_stdout() as output: 2994 self.apply_config(self.config1) 2995 logger = logging.getLogger("compiler.parser") 2996 # Both will output a message 2997 logger.info(self.next_message()) 2998 logger.error(self.next_message()) 2999 self.assert_log_lines([ 3000 ('INFO', '1'), 3001 ('ERROR', '2'), 3002 ], stream=output) 3003 # Original logger output is empty. 3004 self.assert_log_lines([]) 3005 with support.captured_stdout() as output: 3006 self.apply_config(self.config7) 3007 logger = logging.getLogger("compiler.parser") 3008 self.assertTrue(logger.disabled) 3009 logger = logging.getLogger("compiler.lexer") 3010 # Both will output a message 3011 logger.info(self.next_message()) 3012 logger.error(self.next_message()) 3013 self.assert_log_lines([ 3014 ('INFO', '3'), 3015 ('ERROR', '4'), 3016 ], stream=output) 3017 # Original logger output is empty. 3018 self.assert_log_lines([]) 3019 3020 # Same as test_config_7_ok but don't disable old loggers. 3021 def test_config_8_ok(self): 3022 with support.captured_stdout() as output: 3023 self.apply_config(self.config1) 3024 logger = logging.getLogger("compiler.parser") 3025 # All will output a message 3026 logger.info(self.next_message()) 3027 logger.error(self.next_message()) 3028 self.assert_log_lines([ 3029 ('INFO', '1'), 3030 ('ERROR', '2'), 3031 ], stream=output) 3032 # Original logger output is empty. 3033 self.assert_log_lines([]) 3034 with support.captured_stdout() as output: 3035 self.apply_config(self.config8) 3036 logger = logging.getLogger("compiler.parser") 3037 self.assertFalse(logger.disabled) 3038 # Both will output a message 3039 logger.info(self.next_message()) 3040 logger.error(self.next_message()) 3041 logger = logging.getLogger("compiler.lexer") 3042 # Both will output a message 3043 logger.info(self.next_message()) 3044 logger.error(self.next_message()) 3045 self.assert_log_lines([ 3046 ('INFO', '3'), 3047 ('ERROR', '4'), 3048 ('INFO', '5'), 3049 ('ERROR', '6'), 3050 ], stream=output) 3051 # Original logger output is empty. 3052 self.assert_log_lines([]) 3053 3054 def test_config_8a_ok(self): 3055 with support.captured_stdout() as output: 3056 self.apply_config(self.config1a) 3057 logger = logging.getLogger("compiler.parser") 3058 # See issue #11424. compiler-hyphenated sorts 3059 # between compiler and compiler.xyz and this 3060 # was preventing compiler.xyz from being included 3061 # in the child loggers of compiler because of an 3062 # overzealous loop termination condition. 3063 hyphenated = logging.getLogger('compiler-hyphenated') 3064 # All will output a message 3065 logger.info(self.next_message()) 3066 logger.error(self.next_message()) 3067 hyphenated.critical(self.next_message()) 3068 self.assert_log_lines([ 3069 ('INFO', '1'), 3070 ('ERROR', '2'), 3071 ('CRITICAL', '3'), 3072 ], stream=output) 3073 # Original logger output is empty. 3074 self.assert_log_lines([]) 3075 with support.captured_stdout() as output: 3076 self.apply_config(self.config8a) 3077 logger = logging.getLogger("compiler.parser") 3078 self.assertFalse(logger.disabled) 3079 # Both will output a message 3080 logger.info(self.next_message()) 3081 logger.error(self.next_message()) 3082 logger = logging.getLogger("compiler.lexer") 3083 # Both will output a message 3084 logger.info(self.next_message()) 3085 logger.error(self.next_message()) 3086 # Will not appear 3087 hyphenated.critical(self.next_message()) 3088 self.assert_log_lines([ 3089 ('INFO', '4'), 3090 ('ERROR', '5'), 3091 ('INFO', '6'), 3092 ('ERROR', '7'), 3093 ], stream=output) 3094 # Original logger output is empty. 3095 self.assert_log_lines([]) 3096 3097 def test_config_9_ok(self): 3098 with support.captured_stdout() as output: 3099 self.apply_config(self.config9) 3100 logger = logging.getLogger("compiler.parser") 3101 # Nothing will be output since both handler and logger are set to WARNING 3102 logger.info(self.next_message()) 3103 self.assert_log_lines([], stream=output) 3104 self.apply_config(self.config9a) 3105 # Nothing will be output since handler is still set to WARNING 3106 logger.info(self.next_message()) 3107 self.assert_log_lines([], stream=output) 3108 self.apply_config(self.config9b) 3109 # Message should now be output 3110 logger.info(self.next_message()) 3111 self.assert_log_lines([ 3112 ('INFO', '3'), 3113 ], stream=output) 3114 3115 def test_config_10_ok(self): 3116 with support.captured_stdout() as output: 3117 self.apply_config(self.config10) 3118 logger = logging.getLogger("compiler.parser") 3119 logger.warning(self.next_message()) 3120 logger = logging.getLogger('compiler') 3121 # Not output, because filtered 3122 logger.warning(self.next_message()) 3123 logger = logging.getLogger('compiler.lexer') 3124 # Not output, because filtered 3125 logger.warning(self.next_message()) 3126 logger = logging.getLogger("compiler.parser.codegen") 3127 # Output, as not filtered 3128 logger.error(self.next_message()) 3129 self.assert_log_lines([ 3130 ('WARNING', '1'), 3131 ('ERROR', '4'), 3132 ], stream=output) 3133 3134 def test_config11_ok(self): 3135 self.test_config1_ok(self.config11) 3136 3137 def test_config12_failure(self): 3138 self.assertRaises(Exception, self.apply_config, self.config12) 3139 3140 def test_config13_failure(self): 3141 self.assertRaises(Exception, self.apply_config, self.config13) 3142 3143 def test_config14_ok(self): 3144 with support.captured_stdout() as output: 3145 self.apply_config(self.config14) 3146 h = logging._handlers['hand1'] 3147 self.assertEqual(h.foo, 'bar') 3148 self.assertEqual(h.terminator, '!\n') 3149 logging.warning('Exclamation') 3150 self.assertTrue(output.getvalue().endswith('Exclamation!\n')) 3151 3152 def test_config15_ok(self): 3153 3154 def cleanup(h1, fn): 3155 h1.close() 3156 os.remove(fn) 3157 3158 with self.check_no_resource_warning(): 3159 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 3160 os.close(fd) 3161 3162 config = { 3163 "version": 1, 3164 "handlers": { 3165 "file": { 3166 "class": "logging.FileHandler", 3167 "filename": fn 3168 } 3169 }, 3170 "root": { 3171 "handlers": ["file"] 3172 } 3173 } 3174 3175 self.apply_config(config) 3176 self.apply_config(config) 3177 3178 handler = logging.root.handlers[0] 3179 self.addCleanup(cleanup, handler, fn) 3180 3181 def setup_via_listener(self, text, verify=None): 3182 text = text.encode("utf-8") 3183 # Ask for a randomly assigned port (by using port 0) 3184 t = logging.config.listen(0, verify) 3185 t.start() 3186 t.ready.wait() 3187 # Now get the port allocated 3188 port = t.port 3189 t.ready.clear() 3190 try: 3191 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 3192 sock.settimeout(2.0) 3193 sock.connect(('localhost', port)) 3194 3195 slen = struct.pack('>L', len(text)) 3196 s = slen + text 3197 sentsofar = 0 3198 left = len(s) 3199 while left > 0: 3200 sent = sock.send(s[sentsofar:]) 3201 sentsofar += sent 3202 left -= sent 3203 sock.close() 3204 finally: 3205 t.ready.wait(2.0) 3206 logging.config.stopListening() 3207 support.join_thread(t, 2.0) 3208 3209 def test_listen_config_10_ok(self): 3210 with support.captured_stdout() as output: 3211 self.setup_via_listener(json.dumps(self.config10)) 3212 logger = logging.getLogger("compiler.parser") 3213 logger.warning(self.next_message()) 3214 logger = logging.getLogger('compiler') 3215 # Not output, because filtered 3216 logger.warning(self.next_message()) 3217 logger = logging.getLogger('compiler.lexer') 3218 # Not output, because filtered 3219 logger.warning(self.next_message()) 3220 logger = logging.getLogger("compiler.parser.codegen") 3221 # Output, as not filtered 3222 logger.error(self.next_message()) 3223 self.assert_log_lines([ 3224 ('WARNING', '1'), 3225 ('ERROR', '4'), 3226 ], stream=output) 3227 3228 def test_listen_config_1_ok(self): 3229 with support.captured_stdout() as output: 3230 self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) 3231 logger = logging.getLogger("compiler.parser") 3232 # Both will output a message 3233 logger.info(self.next_message()) 3234 logger.error(self.next_message()) 3235 self.assert_log_lines([ 3236 ('INFO', '1'), 3237 ('ERROR', '2'), 3238 ], stream=output) 3239 # Original logger output is empty. 3240 self.assert_log_lines([]) 3241 3242 def test_listen_verify(self): 3243 3244 def verify_fail(stuff): 3245 return None 3246 3247 def verify_reverse(stuff): 3248 return stuff[::-1] 3249 3250 logger = logging.getLogger("compiler.parser") 3251 to_send = textwrap.dedent(ConfigFileTest.config1) 3252 # First, specify a verification function that will fail. 3253 # We expect to see no output, since our configuration 3254 # never took effect. 3255 with support.captured_stdout() as output: 3256 self.setup_via_listener(to_send, verify_fail) 3257 # Both will output a message 3258 logger.info(self.next_message()) 3259 logger.error(self.next_message()) 3260 self.assert_log_lines([], stream=output) 3261 # Original logger output has the stuff we logged. 3262 self.assert_log_lines([ 3263 ('INFO', '1'), 3264 ('ERROR', '2'), 3265 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3266 3267 # Now, perform no verification. Our configuration 3268 # should take effect. 3269 3270 with support.captured_stdout() as output: 3271 self.setup_via_listener(to_send) # no verify callable specified 3272 logger = logging.getLogger("compiler.parser") 3273 # Both will output a message 3274 logger.info(self.next_message()) 3275 logger.error(self.next_message()) 3276 self.assert_log_lines([ 3277 ('INFO', '3'), 3278 ('ERROR', '4'), 3279 ], stream=output) 3280 # Original logger output still has the stuff we logged before. 3281 self.assert_log_lines([ 3282 ('INFO', '1'), 3283 ('ERROR', '2'), 3284 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3285 3286 # Now, perform verification which transforms the bytes. 3287 3288 with support.captured_stdout() as output: 3289 self.setup_via_listener(to_send[::-1], verify_reverse) 3290 logger = logging.getLogger("compiler.parser") 3291 # Both will output a message 3292 logger.info(self.next_message()) 3293 logger.error(self.next_message()) 3294 self.assert_log_lines([ 3295 ('INFO', '5'), 3296 ('ERROR', '6'), 3297 ], stream=output) 3298 # Original logger output still has the stuff we logged before. 3299 self.assert_log_lines([ 3300 ('INFO', '1'), 3301 ('ERROR', '2'), 3302 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3303 3304 def test_out_of_order(self): 3305 self.assertRaises(ValueError, self.apply_config, self.out_of_order) 3306 3307 def test_out_of_order_with_dollar_style(self): 3308 config = copy.deepcopy(self.out_of_order) 3309 config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}" 3310 3311 self.apply_config(config) 3312 handler = logging.getLogger('mymodule').handlers[0] 3313 self.assertIsInstance(handler.target, logging.Handler) 3314 self.assertIsInstance(handler.formatter._style, 3315 logging.StringTemplateStyle) 3316 3317 def test_custom_formatter_class_with_validate(self): 3318 self.apply_config(self.custom_formatter_class_validate) 3319 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3320 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3321 3322 def test_custom_formatter_class_with_validate2(self): 3323 self.apply_config(self.custom_formatter_class_validate2) 3324 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3325 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3326 3327 def test_custom_formatter_class_with_validate2_with_wrong_fmt(self): 3328 config = self.custom_formatter_class_validate.copy() 3329 config['formatters']['form1']['style'] = "$" 3330 3331 # Exception should not be raise as we have configured 'validate' to False 3332 self.apply_config(config) 3333 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3334 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3335 3336 def test_custom_formatter_class_with_validate3(self): 3337 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3) 3338 3339 def test_custom_formatter_function_with_validate(self): 3340 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function) 3341 3342 def test_baseconfig(self): 3343 d = { 3344 'atuple': (1, 2, 3), 3345 'alist': ['a', 'b', 'c'], 3346 'adict': {'d': 'e', 'f': 3 }, 3347 'nest1': ('g', ('h', 'i'), 'j'), 3348 'nest2': ['k', ['l', 'm'], 'n'], 3349 'nest3': ['o', 'cfg://alist', 'p'], 3350 } 3351 bc = logging.config.BaseConfigurator(d) 3352 self.assertEqual(bc.convert('cfg://atuple[1]'), 2) 3353 self.assertEqual(bc.convert('cfg://alist[1]'), 'b') 3354 self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h') 3355 self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm') 3356 self.assertEqual(bc.convert('cfg://adict.d'), 'e') 3357 self.assertEqual(bc.convert('cfg://adict[f]'), 3) 3358 v = bc.convert('cfg://nest3') 3359 self.assertEqual(v.pop(1), ['a', 'b', 'c']) 3360 self.assertRaises(KeyError, bc.convert, 'cfg://nosuch') 3361 self.assertRaises(ValueError, bc.convert, 'cfg://!') 3362 self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]') 3363 3364 def test_namedtuple(self): 3365 # see bpo-39142 3366 from collections import namedtuple 3367 3368 class MyHandler(logging.StreamHandler): 3369 def __init__(self, resource, *args, **kwargs): 3370 super().__init__(*args, **kwargs) 3371 self.resource: namedtuple = resource 3372 3373 def emit(self, record): 3374 record.msg += f' {self.resource.type}' 3375 return super().emit(record) 3376 3377 Resource = namedtuple('Resource', ['type', 'labels']) 3378 resource = Resource(type='my_type', labels=['a']) 3379 3380 config = { 3381 'version': 1, 3382 'handlers': { 3383 'myhandler': { 3384 '()': MyHandler, 3385 'resource': resource 3386 } 3387 }, 3388 'root': {'level': 'INFO', 'handlers': ['myhandler']}, 3389 } 3390 with support.captured_stderr() as stderr: 3391 self.apply_config(config) 3392 logging.info('some log') 3393 self.assertEqual(stderr.getvalue(), 'some log my_type\n') 3394 3395class ManagerTest(BaseTest): 3396 def test_manager_loggerclass(self): 3397 logged = [] 3398 3399 class MyLogger(logging.Logger): 3400 def _log(self, level, msg, args, exc_info=None, extra=None): 3401 logged.append(msg) 3402 3403 man = logging.Manager(None) 3404 self.assertRaises(TypeError, man.setLoggerClass, int) 3405 man.setLoggerClass(MyLogger) 3406 logger = man.getLogger('test') 3407 logger.warning('should appear in logged') 3408 logging.warning('should not appear in logged') 3409 3410 self.assertEqual(logged, ['should appear in logged']) 3411 3412 def test_set_log_record_factory(self): 3413 man = logging.Manager(None) 3414 expected = object() 3415 man.setLogRecordFactory(expected) 3416 self.assertEqual(man.logRecordFactory, expected) 3417 3418class ChildLoggerTest(BaseTest): 3419 def test_child_loggers(self): 3420 r = logging.getLogger() 3421 l1 = logging.getLogger('abc') 3422 l2 = logging.getLogger('def.ghi') 3423 c1 = r.getChild('xyz') 3424 c2 = r.getChild('uvw.xyz') 3425 self.assertIs(c1, logging.getLogger('xyz')) 3426 self.assertIs(c2, logging.getLogger('uvw.xyz')) 3427 c1 = l1.getChild('def') 3428 c2 = c1.getChild('ghi') 3429 c3 = l1.getChild('def.ghi') 3430 self.assertIs(c1, logging.getLogger('abc.def')) 3431 self.assertIs(c2, logging.getLogger('abc.def.ghi')) 3432 self.assertIs(c2, c3) 3433 3434 3435class DerivedLogRecord(logging.LogRecord): 3436 pass 3437 3438class LogRecordFactoryTest(BaseTest): 3439 3440 def setUp(self): 3441 class CheckingFilter(logging.Filter): 3442 def __init__(self, cls): 3443 self.cls = cls 3444 3445 def filter(self, record): 3446 t = type(record) 3447 if t is not self.cls: 3448 msg = 'Unexpected LogRecord type %s, expected %s' % (t, 3449 self.cls) 3450 raise TypeError(msg) 3451 return True 3452 3453 BaseTest.setUp(self) 3454 self.filter = CheckingFilter(DerivedLogRecord) 3455 self.root_logger.addFilter(self.filter) 3456 self.orig_factory = logging.getLogRecordFactory() 3457 3458 def tearDown(self): 3459 self.root_logger.removeFilter(self.filter) 3460 BaseTest.tearDown(self) 3461 logging.setLogRecordFactory(self.orig_factory) 3462 3463 def test_logrecord_class(self): 3464 self.assertRaises(TypeError, self.root_logger.warning, 3465 self.next_message()) 3466 logging.setLogRecordFactory(DerivedLogRecord) 3467 self.root_logger.error(self.next_message()) 3468 self.assert_log_lines([ 3469 ('root', 'ERROR', '2'), 3470 ]) 3471 3472 3473class QueueHandlerTest(BaseTest): 3474 # Do not bother with a logger name group. 3475 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 3476 3477 def setUp(self): 3478 BaseTest.setUp(self) 3479 self.queue = queue.Queue(-1) 3480 self.que_hdlr = logging.handlers.QueueHandler(self.queue) 3481 self.name = 'que' 3482 self.que_logger = logging.getLogger('que') 3483 self.que_logger.propagate = False 3484 self.que_logger.setLevel(logging.WARNING) 3485 self.que_logger.addHandler(self.que_hdlr) 3486 3487 def tearDown(self): 3488 self.que_hdlr.close() 3489 BaseTest.tearDown(self) 3490 3491 def test_queue_handler(self): 3492 self.que_logger.debug(self.next_message()) 3493 self.assertRaises(queue.Empty, self.queue.get_nowait) 3494 self.que_logger.info(self.next_message()) 3495 self.assertRaises(queue.Empty, self.queue.get_nowait) 3496 msg = self.next_message() 3497 self.que_logger.warning(msg) 3498 data = self.queue.get_nowait() 3499 self.assertTrue(isinstance(data, logging.LogRecord)) 3500 self.assertEqual(data.name, self.que_logger.name) 3501 self.assertEqual((data.msg, data.args), (msg, None)) 3502 3503 def test_formatting(self): 3504 msg = self.next_message() 3505 levelname = logging.getLevelName(logging.WARNING) 3506 log_format_str = '{name} -> {levelname}: {message}' 3507 formatted_msg = log_format_str.format(name=self.name, 3508 levelname=levelname, message=msg) 3509 formatter = logging.Formatter(self.log_format) 3510 self.que_hdlr.setFormatter(formatter) 3511 self.que_logger.warning(msg) 3512 log_record = self.queue.get_nowait() 3513 self.assertEqual(formatted_msg, log_record.msg) 3514 self.assertEqual(formatted_msg, log_record.message) 3515 3516 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3517 'logging.handlers.QueueListener required for this test') 3518 def test_queue_listener(self): 3519 handler = support.TestHandler(support.Matcher()) 3520 listener = logging.handlers.QueueListener(self.queue, handler) 3521 listener.start() 3522 try: 3523 self.que_logger.warning(self.next_message()) 3524 self.que_logger.error(self.next_message()) 3525 self.que_logger.critical(self.next_message()) 3526 finally: 3527 listener.stop() 3528 self.assertTrue(handler.matches(levelno=logging.WARNING, message='1')) 3529 self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) 3530 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) 3531 handler.close() 3532 3533 # Now test with respect_handler_level set 3534 3535 handler = support.TestHandler(support.Matcher()) 3536 handler.setLevel(logging.CRITICAL) 3537 listener = logging.handlers.QueueListener(self.queue, handler, 3538 respect_handler_level=True) 3539 listener.start() 3540 try: 3541 self.que_logger.warning(self.next_message()) 3542 self.que_logger.error(self.next_message()) 3543 self.que_logger.critical(self.next_message()) 3544 finally: 3545 listener.stop() 3546 self.assertFalse(handler.matches(levelno=logging.WARNING, message='4')) 3547 self.assertFalse(handler.matches(levelno=logging.ERROR, message='5')) 3548 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6')) 3549 handler.close() 3550 3551 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3552 'logging.handlers.QueueListener required for this test') 3553 def test_queue_listener_with_StreamHandler(self): 3554 # Test that traceback only appends once (bpo-34334). 3555 listener = logging.handlers.QueueListener(self.queue, self.root_hdlr) 3556 listener.start() 3557 try: 3558 1 / 0 3559 except ZeroDivisionError as e: 3560 exc = e 3561 self.que_logger.exception(self.next_message(), exc_info=exc) 3562 listener.stop() 3563 self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1) 3564 3565 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3566 'logging.handlers.QueueListener required for this test') 3567 def test_queue_listener_with_multiple_handlers(self): 3568 # Test that queue handler format doesn't affect other handler formats (bpo-35726). 3569 self.que_hdlr.setFormatter(self.root_formatter) 3570 self.que_logger.addHandler(self.root_hdlr) 3571 3572 listener = logging.handlers.QueueListener(self.queue, self.que_hdlr) 3573 listener.start() 3574 self.que_logger.error("error") 3575 listener.stop() 3576 self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error") 3577 3578if hasattr(logging.handlers, 'QueueListener'): 3579 import multiprocessing 3580 from unittest.mock import patch 3581 3582 class QueueListenerTest(BaseTest): 3583 """ 3584 Tests based on patch submitted for issue #27930. Ensure that 3585 QueueListener handles all log messages. 3586 """ 3587 3588 repeat = 20 3589 3590 @staticmethod 3591 def setup_and_log(log_queue, ident): 3592 """ 3593 Creates a logger with a QueueHandler that logs to a queue read by a 3594 QueueListener. Starts the listener, logs five messages, and stops 3595 the listener. 3596 """ 3597 logger = logging.getLogger('test_logger_with_id_%s' % ident) 3598 logger.setLevel(logging.DEBUG) 3599 handler = logging.handlers.QueueHandler(log_queue) 3600 logger.addHandler(handler) 3601 listener = logging.handlers.QueueListener(log_queue) 3602 listener.start() 3603 3604 logger.info('one') 3605 logger.info('two') 3606 logger.info('three') 3607 logger.info('four') 3608 logger.info('five') 3609 3610 listener.stop() 3611 logger.removeHandler(handler) 3612 handler.close() 3613 3614 @patch.object(logging.handlers.QueueListener, 'handle') 3615 def test_handle_called_with_queue_queue(self, mock_handle): 3616 for i in range(self.repeat): 3617 log_queue = queue.Queue() 3618 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3619 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3620 'correct number of handled log messages') 3621 3622 @patch.object(logging.handlers.QueueListener, 'handle') 3623 def test_handle_called_with_mp_queue(self, mock_handle): 3624 # bpo-28668: The multiprocessing (mp) module is not functional 3625 # when the mp.synchronize module cannot be imported. 3626 support.skip_if_broken_multiprocessing_synchronize() 3627 for i in range(self.repeat): 3628 log_queue = multiprocessing.Queue() 3629 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3630 log_queue.close() 3631 log_queue.join_thread() 3632 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3633 'correct number of handled log messages') 3634 3635 @staticmethod 3636 def get_all_from_queue(log_queue): 3637 try: 3638 while True: 3639 yield log_queue.get_nowait() 3640 except queue.Empty: 3641 return [] 3642 3643 def test_no_messages_in_queue_after_stop(self): 3644 """ 3645 Five messages are logged then the QueueListener is stopped. This 3646 test then gets everything off the queue. Failure of this test 3647 indicates that messages were not registered on the queue until 3648 _after_ the QueueListener stopped. 3649 """ 3650 # bpo-28668: The multiprocessing (mp) module is not functional 3651 # when the mp.synchronize module cannot be imported. 3652 support.skip_if_broken_multiprocessing_synchronize() 3653 for i in range(self.repeat): 3654 queue = multiprocessing.Queue() 3655 self.setup_and_log(queue, '%s_%s' %(self.id(), i)) 3656 # time.sleep(1) 3657 items = list(self.get_all_from_queue(queue)) 3658 queue.close() 3659 queue.join_thread() 3660 3661 expected = [[], [logging.handlers.QueueListener._sentinel]] 3662 self.assertIn(items, expected, 3663 'Found unexpected messages in queue: %s' % ( 3664 [m.msg if isinstance(m, logging.LogRecord) 3665 else m for m in items])) 3666 3667 def test_calls_task_done_after_stop(self): 3668 # Issue 36813: Make sure queue.join does not deadlock. 3669 log_queue = queue.Queue() 3670 listener = logging.handlers.QueueListener(log_queue) 3671 listener.start() 3672 listener.stop() 3673 with self.assertRaises(ValueError): 3674 # Make sure all tasks are done and .join won't block. 3675 log_queue.task_done() 3676 3677 3678ZERO = datetime.timedelta(0) 3679 3680class UTC(datetime.tzinfo): 3681 def utcoffset(self, dt): 3682 return ZERO 3683 3684 dst = utcoffset 3685 3686 def tzname(self, dt): 3687 return 'UTC' 3688 3689utc = UTC() 3690 3691class FormatterTest(unittest.TestCase): 3692 def setUp(self): 3693 self.common = { 3694 'name': 'formatter.test', 3695 'level': logging.DEBUG, 3696 'pathname': os.path.join('path', 'to', 'dummy.ext'), 3697 'lineno': 42, 3698 'exc_info': None, 3699 'func': None, 3700 'msg': 'Message with %d %s', 3701 'args': (2, 'placeholders'), 3702 } 3703 self.variants = { 3704 } 3705 3706 def get_record(self, name=None): 3707 result = dict(self.common) 3708 if name is not None: 3709 result.update(self.variants[name]) 3710 return logging.makeLogRecord(result) 3711 3712 def assert_error_message(self, exception, message, *args, **kwargs): 3713 try: 3714 self.assertRaises(exception, *args, **kwargs) 3715 except exception as e: 3716 self.assertEqual(message, e.message) 3717 3718 def test_percent(self): 3719 # Test %-formatting 3720 r = self.get_record() 3721 f = logging.Formatter('${%(message)s}') 3722 self.assertEqual(f.format(r), '${Message with 2 placeholders}') 3723 f = logging.Formatter('%(random)s') 3724 self.assertRaises(ValueError, f.format, r) 3725 self.assertFalse(f.usesTime()) 3726 f = logging.Formatter('%(asctime)s') 3727 self.assertTrue(f.usesTime()) 3728 f = logging.Formatter('%(asctime)-15s') 3729 self.assertTrue(f.usesTime()) 3730 f = logging.Formatter('%(asctime)#15s') 3731 self.assertTrue(f.usesTime()) 3732 3733 def test_braces(self): 3734 # Test {}-formatting 3735 r = self.get_record() 3736 f = logging.Formatter('$%{message}%$', style='{') 3737 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3738 f = logging.Formatter('{random}', style='{') 3739 self.assertRaises(ValueError, f.format, r) 3740 f = logging.Formatter("{message}", style='{') 3741 self.assertFalse(f.usesTime()) 3742 f = logging.Formatter('{asctime}', style='{') 3743 self.assertTrue(f.usesTime()) 3744 f = logging.Formatter('{asctime!s:15}', style='{') 3745 self.assertTrue(f.usesTime()) 3746 f = logging.Formatter('{asctime:15}', style='{') 3747 self.assertTrue(f.usesTime()) 3748 3749 def test_dollars(self): 3750 # Test $-formatting 3751 r = self.get_record() 3752 f = logging.Formatter('${message}', style='$') 3753 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3754 f = logging.Formatter('$message', style='$') 3755 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3756 f = logging.Formatter('$$%${message}%$$', style='$') 3757 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3758 f = logging.Formatter('${random}', style='$') 3759 self.assertRaises(ValueError, f.format, r) 3760 self.assertFalse(f.usesTime()) 3761 f = logging.Formatter('${asctime}', style='$') 3762 self.assertTrue(f.usesTime()) 3763 f = logging.Formatter('$asctime', style='$') 3764 self.assertTrue(f.usesTime()) 3765 f = logging.Formatter('${message}', style='$') 3766 self.assertFalse(f.usesTime()) 3767 f = logging.Formatter('${asctime}--', style='$') 3768 self.assertTrue(f.usesTime()) 3769 3770 def test_format_validate(self): 3771 # Check correct formatting 3772 # Percentage style 3773 f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3774 self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3775 f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3776 self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3777 f = logging.Formatter("%(process)#+027.23X") 3778 self.assertEqual(f._fmt, "%(process)#+027.23X") 3779 f = logging.Formatter("%(foo)#.*g") 3780 self.assertEqual(f._fmt, "%(foo)#.*g") 3781 3782 # StrFormat Style 3783 f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{") 3784 self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}") 3785 f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{") 3786 self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}") 3787 f = logging.Formatter("{customfield!s:#<30}", style="{") 3788 self.assertEqual(f._fmt, "{customfield!s:#<30}") 3789 f = logging.Formatter("{message!r}", style="{") 3790 self.assertEqual(f._fmt, "{message!r}") 3791 f = logging.Formatter("{message!s}", style="{") 3792 self.assertEqual(f._fmt, "{message!s}") 3793 f = logging.Formatter("{message!a}", style="{") 3794 self.assertEqual(f._fmt, "{message!a}") 3795 f = logging.Formatter("{process!r:4.2}", style="{") 3796 self.assertEqual(f._fmt, "{process!r:4.2}") 3797 f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{") 3798 self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}") 3799 f = logging.Formatter("{process!s:{w},.{p}}", style="{") 3800 self.assertEqual(f._fmt, "{process!s:{w},.{p}}") 3801 f = logging.Formatter("{foo:12.{p}}", style="{") 3802 self.assertEqual(f._fmt, "{foo:12.{p}}") 3803 f = logging.Formatter("{foo:{w}.6}", style="{") 3804 self.assertEqual(f._fmt, "{foo:{w}.6}") 3805 f = logging.Formatter("{foo[0].bar[1].baz}", style="{") 3806 self.assertEqual(f._fmt, "{foo[0].bar[1].baz}") 3807 f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{") 3808 self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}") 3809 f = logging.Formatter("{12[k1].bar[k2].baz}", style="{") 3810 self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}") 3811 3812 # Dollar style 3813 f = logging.Formatter("${asctime} - $message", style="$") 3814 self.assertEqual(f._fmt, "${asctime} - $message") 3815 f = logging.Formatter("$bar $$", style="$") 3816 self.assertEqual(f._fmt, "$bar $$") 3817 f = logging.Formatter("$bar $$$$", style="$") 3818 self.assertEqual(f._fmt, "$bar $$$$") # this would print two $($$) 3819 3820 # Testing when ValueError being raised from incorrect format 3821 # Percentage Style 3822 self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z") 3823 self.assertRaises(ValueError, logging.Formatter, "%(asctime)b") 3824 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*") 3825 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s") 3826 self.assertRaises(ValueError, logging.Formatter, "%(asctime)_") 3827 self.assertRaises(ValueError, logging.Formatter, '{asctime}') 3828 self.assertRaises(ValueError, logging.Formatter, '${message}') 3829 self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f') # with both * and decimal number as precision 3830 self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f') 3831 3832 # StrFormat Style 3833 # Testing failure for '-' in field name 3834 self.assert_error_message( 3835 ValueError, 3836 "invalid field name/expression: 'name-thing'", 3837 logging.Formatter, "{name-thing}", style="{" 3838 ) 3839 # Testing failure for style mismatch 3840 self.assert_error_message( 3841 ValueError, 3842 "invalid format: no fields", 3843 logging.Formatter, '%(asctime)s', style='{' 3844 ) 3845 # Testing failure for invalid conversion 3846 self.assert_error_message( 3847 ValueError, 3848 "invalid conversion: 'Z'" 3849 ) 3850 self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{') 3851 self.assert_error_message( 3852 ValueError, 3853 "invalid format: expected ':' after conversion specifier", 3854 logging.Formatter, '{asctime!aa:15}', style='{' 3855 ) 3856 # Testing failure for invalid spec 3857 self.assert_error_message( 3858 ValueError, 3859 "bad specifier: '.2ff'", 3860 logging.Formatter, '{process:.2ff}', style='{' 3861 ) 3862 self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{') 3863 self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{') 3864 self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{') 3865 self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{') 3866 # Testing failure for mismatch braces 3867 self.assert_error_message( 3868 ValueError, 3869 "invalid format: unmatched '{' in format spec", 3870 logging.Formatter, '{process', style='{' 3871 ) 3872 self.assert_error_message( 3873 ValueError, 3874 "invalid format: unmatched '{' in format spec", 3875 logging.Formatter, 'process}', style='{' 3876 ) 3877 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{') 3878 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{') 3879 self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{') 3880 self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{') 3881 self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{') 3882 self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{') 3883 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{') 3884 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{') 3885 self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{') 3886 3887 # Dollar style 3888 # Testing failure for mismatch bare $ 3889 self.assert_error_message( 3890 ValueError, 3891 "invalid format: bare \'$\' not allowed", 3892 logging.Formatter, '$bar $$$', style='$' 3893 ) 3894 self.assert_error_message( 3895 ValueError, 3896 "invalid format: bare \'$\' not allowed", 3897 logging.Formatter, 'bar $', style='$' 3898 ) 3899 self.assert_error_message( 3900 ValueError, 3901 "invalid format: bare \'$\' not allowed", 3902 logging.Formatter, 'foo $.', style='$' 3903 ) 3904 # Testing failure for mismatch style 3905 self.assert_error_message( 3906 ValueError, 3907 "invalid format: no fields", 3908 logging.Formatter, '{asctime}', style='$' 3909 ) 3910 self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$') 3911 3912 # Testing failure for incorrect fields 3913 self.assert_error_message( 3914 ValueError, 3915 "invalid format: no fields", 3916 logging.Formatter, 'foo', style='$' 3917 ) 3918 self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$') 3919 3920 def test_invalid_style(self): 3921 self.assertRaises(ValueError, logging.Formatter, None, None, 'x') 3922 3923 def test_time(self): 3924 r = self.get_record() 3925 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc) 3926 # We use None to indicate we want the local timezone 3927 # We're essentially converting a UTC time to local time 3928 r.created = time.mktime(dt.astimezone(None).timetuple()) 3929 r.msecs = 123 3930 f = logging.Formatter('%(asctime)s %(message)s') 3931 f.converter = time.gmtime 3932 self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123') 3933 self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21') 3934 f.format(r) 3935 self.assertEqual(r.asctime, '1993-04-21 08:03:00,123') 3936 3937class TestBufferingFormatter(logging.BufferingFormatter): 3938 def formatHeader(self, records): 3939 return '[(%d)' % len(records) 3940 3941 def formatFooter(self, records): 3942 return '(%d)]' % len(records) 3943 3944class BufferingFormatterTest(unittest.TestCase): 3945 def setUp(self): 3946 self.records = [ 3947 logging.makeLogRecord({'msg': 'one'}), 3948 logging.makeLogRecord({'msg': 'two'}), 3949 ] 3950 3951 def test_default(self): 3952 f = logging.BufferingFormatter() 3953 self.assertEqual('', f.format([])) 3954 self.assertEqual('onetwo', f.format(self.records)) 3955 3956 def test_custom(self): 3957 f = TestBufferingFormatter() 3958 self.assertEqual('[(2)onetwo(2)]', f.format(self.records)) 3959 lf = logging.Formatter('<%(message)s>') 3960 f = TestBufferingFormatter(lf) 3961 self.assertEqual('[(2)<one><two>(2)]', f.format(self.records)) 3962 3963class ExceptionTest(BaseTest): 3964 def test_formatting(self): 3965 r = self.root_logger 3966 h = RecordingHandler() 3967 r.addHandler(h) 3968 try: 3969 raise RuntimeError('deliberate mistake') 3970 except: 3971 logging.exception('failed', stack_info=True) 3972 r.removeHandler(h) 3973 h.close() 3974 r = h.records[0] 3975 self.assertTrue(r.exc_text.startswith('Traceback (most recent ' 3976 'call last):\n')) 3977 self.assertTrue(r.exc_text.endswith('\nRuntimeError: ' 3978 'deliberate mistake')) 3979 self.assertTrue(r.stack_info.startswith('Stack (most recent ' 3980 'call last):\n')) 3981 self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', ' 3982 'stack_info=True)')) 3983 3984 3985class LastResortTest(BaseTest): 3986 def test_last_resort(self): 3987 # Test the last resort handler 3988 root = self.root_logger 3989 root.removeHandler(self.root_hdlr) 3990 old_lastresort = logging.lastResort 3991 old_raise_exceptions = logging.raiseExceptions 3992 3993 try: 3994 with support.captured_stderr() as stderr: 3995 root.debug('This should not appear') 3996 self.assertEqual(stderr.getvalue(), '') 3997 root.warning('Final chance!') 3998 self.assertEqual(stderr.getvalue(), 'Final chance!\n') 3999 4000 # No handlers and no last resort, so 'No handlers' message 4001 logging.lastResort = None 4002 with support.captured_stderr() as stderr: 4003 root.warning('Final chance!') 4004 msg = 'No handlers could be found for logger "root"\n' 4005 self.assertEqual(stderr.getvalue(), msg) 4006 4007 # 'No handlers' message only printed once 4008 with support.captured_stderr() as stderr: 4009 root.warning('Final chance!') 4010 self.assertEqual(stderr.getvalue(), '') 4011 4012 # If raiseExceptions is False, no message is printed 4013 root.manager.emittedNoHandlerWarning = False 4014 logging.raiseExceptions = False 4015 with support.captured_stderr() as stderr: 4016 root.warning('Final chance!') 4017 self.assertEqual(stderr.getvalue(), '') 4018 finally: 4019 root.addHandler(self.root_hdlr) 4020 logging.lastResort = old_lastresort 4021 logging.raiseExceptions = old_raise_exceptions 4022 4023 4024class FakeHandler: 4025 4026 def __init__(self, identifier, called): 4027 for method in ('acquire', 'flush', 'close', 'release'): 4028 setattr(self, method, self.record_call(identifier, method, called)) 4029 4030 def record_call(self, identifier, method_name, called): 4031 def inner(): 4032 called.append('{} - {}'.format(identifier, method_name)) 4033 return inner 4034 4035 4036class RecordingHandler(logging.NullHandler): 4037 4038 def __init__(self, *args, **kwargs): 4039 super(RecordingHandler, self).__init__(*args, **kwargs) 4040 self.records = [] 4041 4042 def handle(self, record): 4043 """Keep track of all the emitted records.""" 4044 self.records.append(record) 4045 4046 4047class ShutdownTest(BaseTest): 4048 4049 """Test suite for the shutdown method.""" 4050 4051 def setUp(self): 4052 super(ShutdownTest, self).setUp() 4053 self.called = [] 4054 4055 raise_exceptions = logging.raiseExceptions 4056 self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions) 4057 4058 def raise_error(self, error): 4059 def inner(): 4060 raise error() 4061 return inner 4062 4063 def test_no_failure(self): 4064 # create some fake handlers 4065 handler0 = FakeHandler(0, self.called) 4066 handler1 = FakeHandler(1, self.called) 4067 handler2 = FakeHandler(2, self.called) 4068 4069 # create live weakref to those handlers 4070 handlers = map(logging.weakref.ref, [handler0, handler1, handler2]) 4071 4072 logging.shutdown(handlerList=list(handlers)) 4073 4074 expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release', 4075 '1 - acquire', '1 - flush', '1 - close', '1 - release', 4076 '0 - acquire', '0 - flush', '0 - close', '0 - release'] 4077 self.assertEqual(expected, self.called) 4078 4079 def _test_with_failure_in_method(self, method, error): 4080 handler = FakeHandler(0, self.called) 4081 setattr(handler, method, self.raise_error(error)) 4082 handlers = [logging.weakref.ref(handler)] 4083 4084 logging.shutdown(handlerList=list(handlers)) 4085 4086 self.assertEqual('0 - release', self.called[-1]) 4087 4088 def test_with_ioerror_in_acquire(self): 4089 self._test_with_failure_in_method('acquire', OSError) 4090 4091 def test_with_ioerror_in_flush(self): 4092 self._test_with_failure_in_method('flush', OSError) 4093 4094 def test_with_ioerror_in_close(self): 4095 self._test_with_failure_in_method('close', OSError) 4096 4097 def test_with_valueerror_in_acquire(self): 4098 self._test_with_failure_in_method('acquire', ValueError) 4099 4100 def test_with_valueerror_in_flush(self): 4101 self._test_with_failure_in_method('flush', ValueError) 4102 4103 def test_with_valueerror_in_close(self): 4104 self._test_with_failure_in_method('close', ValueError) 4105 4106 def test_with_other_error_in_acquire_without_raise(self): 4107 logging.raiseExceptions = False 4108 self._test_with_failure_in_method('acquire', IndexError) 4109 4110 def test_with_other_error_in_flush_without_raise(self): 4111 logging.raiseExceptions = False 4112 self._test_with_failure_in_method('flush', IndexError) 4113 4114 def test_with_other_error_in_close_without_raise(self): 4115 logging.raiseExceptions = False 4116 self._test_with_failure_in_method('close', IndexError) 4117 4118 def test_with_other_error_in_acquire_with_raise(self): 4119 logging.raiseExceptions = True 4120 self.assertRaises(IndexError, self._test_with_failure_in_method, 4121 'acquire', IndexError) 4122 4123 def test_with_other_error_in_flush_with_raise(self): 4124 logging.raiseExceptions = True 4125 self.assertRaises(IndexError, self._test_with_failure_in_method, 4126 'flush', IndexError) 4127 4128 def test_with_other_error_in_close_with_raise(self): 4129 logging.raiseExceptions = True 4130 self.assertRaises(IndexError, self._test_with_failure_in_method, 4131 'close', IndexError) 4132 4133 4134class ModuleLevelMiscTest(BaseTest): 4135 4136 """Test suite for some module level methods.""" 4137 4138 def test_disable(self): 4139 old_disable = logging.root.manager.disable 4140 # confirm our assumptions are correct 4141 self.assertEqual(old_disable, 0) 4142 self.addCleanup(logging.disable, old_disable) 4143 4144 logging.disable(83) 4145 self.assertEqual(logging.root.manager.disable, 83) 4146 4147 # test the default value introduced in 3.7 4148 # (Issue #28524) 4149 logging.disable() 4150 self.assertEqual(logging.root.manager.disable, logging.CRITICAL) 4151 4152 def _test_log(self, method, level=None): 4153 called = [] 4154 support.patch(self, logging, 'basicConfig', 4155 lambda *a, **kw: called.append((a, kw))) 4156 4157 recording = RecordingHandler() 4158 logging.root.addHandler(recording) 4159 4160 log_method = getattr(logging, method) 4161 if level is not None: 4162 log_method(level, "test me: %r", recording) 4163 else: 4164 log_method("test me: %r", recording) 4165 4166 self.assertEqual(len(recording.records), 1) 4167 record = recording.records[0] 4168 self.assertEqual(record.getMessage(), "test me: %r" % recording) 4169 4170 expected_level = level if level is not None else getattr(logging, method.upper()) 4171 self.assertEqual(record.levelno, expected_level) 4172 4173 # basicConfig was not called! 4174 self.assertEqual(called, []) 4175 4176 def test_log(self): 4177 self._test_log('log', logging.ERROR) 4178 4179 def test_debug(self): 4180 self._test_log('debug') 4181 4182 def test_info(self): 4183 self._test_log('info') 4184 4185 def test_warning(self): 4186 self._test_log('warning') 4187 4188 def test_error(self): 4189 self._test_log('error') 4190 4191 def test_critical(self): 4192 self._test_log('critical') 4193 4194 def test_set_logger_class(self): 4195 self.assertRaises(TypeError, logging.setLoggerClass, object) 4196 4197 class MyLogger(logging.Logger): 4198 pass 4199 4200 logging.setLoggerClass(MyLogger) 4201 self.assertEqual(logging.getLoggerClass(), MyLogger) 4202 4203 logging.setLoggerClass(logging.Logger) 4204 self.assertEqual(logging.getLoggerClass(), logging.Logger) 4205 4206 def test_subclass_logger_cache(self): 4207 # bpo-37258 4208 message = [] 4209 4210 class MyLogger(logging.getLoggerClass()): 4211 def __init__(self, name='MyLogger', level=logging.NOTSET): 4212 super().__init__(name, level) 4213 message.append('initialized') 4214 4215 logging.setLoggerClass(MyLogger) 4216 logger = logging.getLogger('just_some_logger') 4217 self.assertEqual(message, ['initialized']) 4218 stream = io.StringIO() 4219 h = logging.StreamHandler(stream) 4220 logger.addHandler(h) 4221 try: 4222 logger.setLevel(logging.DEBUG) 4223 logger.debug("hello") 4224 self.assertEqual(stream.getvalue().strip(), "hello") 4225 4226 stream.truncate(0) 4227 stream.seek(0) 4228 4229 logger.setLevel(logging.INFO) 4230 logger.debug("hello") 4231 self.assertEqual(stream.getvalue(), "") 4232 finally: 4233 logger.removeHandler(h) 4234 h.close() 4235 logging.setLoggerClass(logging.Logger) 4236 4237 @support.requires_type_collecting 4238 def test_logging_at_shutdown(self): 4239 # Issue #20037 4240 code = """if 1: 4241 import logging 4242 4243 class A: 4244 def __del__(self): 4245 try: 4246 raise ValueError("some error") 4247 except Exception: 4248 logging.exception("exception in __del__") 4249 4250 a = A()""" 4251 rc, out, err = assert_python_ok("-c", code) 4252 err = err.decode() 4253 self.assertIn("exception in __del__", err) 4254 self.assertIn("ValueError: some error", err) 4255 4256 def test_recursion_error(self): 4257 # Issue 36272 4258 code = """if 1: 4259 import logging 4260 4261 def rec(): 4262 logging.error("foo") 4263 rec() 4264 4265 rec()""" 4266 rc, out, err = assert_python_failure("-c", code) 4267 err = err.decode() 4268 self.assertNotIn("Cannot recover from stack overflow.", err) 4269 self.assertEqual(rc, 1) 4270 4271 4272class LogRecordTest(BaseTest): 4273 def test_str_rep(self): 4274 r = logging.makeLogRecord({}) 4275 s = str(r) 4276 self.assertTrue(s.startswith('<LogRecord: ')) 4277 self.assertTrue(s.endswith('>')) 4278 4279 def test_dict_arg(self): 4280 h = RecordingHandler() 4281 r = logging.getLogger() 4282 r.addHandler(h) 4283 d = {'less' : 'more' } 4284 logging.warning('less is %(less)s', d) 4285 self.assertIs(h.records[0].args, d) 4286 self.assertEqual(h.records[0].message, 'less is more') 4287 r.removeHandler(h) 4288 h.close() 4289 4290 def test_multiprocessing(self): 4291 r = logging.makeLogRecord({}) 4292 self.assertEqual(r.processName, 'MainProcess') 4293 try: 4294 import multiprocessing as mp 4295 r = logging.makeLogRecord({}) 4296 self.assertEqual(r.processName, mp.current_process().name) 4297 except ImportError: 4298 pass 4299 4300 def test_optional(self): 4301 r = logging.makeLogRecord({}) 4302 NOT_NONE = self.assertIsNotNone 4303 NOT_NONE(r.thread) 4304 NOT_NONE(r.threadName) 4305 NOT_NONE(r.process) 4306 NOT_NONE(r.processName) 4307 log_threads = logging.logThreads 4308 log_processes = logging.logProcesses 4309 log_multiprocessing = logging.logMultiprocessing 4310 try: 4311 logging.logThreads = False 4312 logging.logProcesses = False 4313 logging.logMultiprocessing = False 4314 r = logging.makeLogRecord({}) 4315 NONE = self.assertIsNone 4316 NONE(r.thread) 4317 NONE(r.threadName) 4318 NONE(r.process) 4319 NONE(r.processName) 4320 finally: 4321 logging.logThreads = log_threads 4322 logging.logProcesses = log_processes 4323 logging.logMultiprocessing = log_multiprocessing 4324 4325class BasicConfigTest(unittest.TestCase): 4326 4327 """Test suite for logging.basicConfig.""" 4328 4329 def setUp(self): 4330 super(BasicConfigTest, self).setUp() 4331 self.handlers = logging.root.handlers 4332 self.saved_handlers = logging._handlers.copy() 4333 self.saved_handler_list = logging._handlerList[:] 4334 self.original_logging_level = logging.root.level 4335 self.addCleanup(self.cleanup) 4336 logging.root.handlers = [] 4337 4338 def tearDown(self): 4339 for h in logging.root.handlers[:]: 4340 logging.root.removeHandler(h) 4341 h.close() 4342 super(BasicConfigTest, self).tearDown() 4343 4344 def cleanup(self): 4345 setattr(logging.root, 'handlers', self.handlers) 4346 logging._handlers.clear() 4347 logging._handlers.update(self.saved_handlers) 4348 logging._handlerList[:] = self.saved_handler_list 4349 logging.root.setLevel(self.original_logging_level) 4350 4351 def test_no_kwargs(self): 4352 logging.basicConfig() 4353 4354 # handler defaults to a StreamHandler to sys.stderr 4355 self.assertEqual(len(logging.root.handlers), 1) 4356 handler = logging.root.handlers[0] 4357 self.assertIsInstance(handler, logging.StreamHandler) 4358 self.assertEqual(handler.stream, sys.stderr) 4359 4360 formatter = handler.formatter 4361 # format defaults to logging.BASIC_FORMAT 4362 self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT) 4363 # datefmt defaults to None 4364 self.assertIsNone(formatter.datefmt) 4365 # style defaults to % 4366 self.assertIsInstance(formatter._style, logging.PercentStyle) 4367 4368 # level is not explicitly set 4369 self.assertEqual(logging.root.level, self.original_logging_level) 4370 4371 def test_strformatstyle(self): 4372 with support.captured_stdout() as output: 4373 logging.basicConfig(stream=sys.stdout, style="{") 4374 logging.error("Log an error") 4375 sys.stdout.seek(0) 4376 self.assertEqual(output.getvalue().strip(), 4377 "ERROR:root:Log an error") 4378 4379 def test_stringtemplatestyle(self): 4380 with support.captured_stdout() as output: 4381 logging.basicConfig(stream=sys.stdout, style="$") 4382 logging.error("Log an error") 4383 sys.stdout.seek(0) 4384 self.assertEqual(output.getvalue().strip(), 4385 "ERROR:root:Log an error") 4386 4387 def test_filename(self): 4388 4389 def cleanup(h1, h2, fn): 4390 h1.close() 4391 h2.close() 4392 os.remove(fn) 4393 4394 logging.basicConfig(filename='test.log') 4395 4396 self.assertEqual(len(logging.root.handlers), 1) 4397 handler = logging.root.handlers[0] 4398 self.assertIsInstance(handler, logging.FileHandler) 4399 4400 expected = logging.FileHandler('test.log', 'a') 4401 self.assertEqual(handler.stream.mode, expected.stream.mode) 4402 self.assertEqual(handler.stream.name, expected.stream.name) 4403 self.addCleanup(cleanup, handler, expected, 'test.log') 4404 4405 def test_filemode(self): 4406 4407 def cleanup(h1, h2, fn): 4408 h1.close() 4409 h2.close() 4410 os.remove(fn) 4411 4412 logging.basicConfig(filename='test.log', filemode='wb') 4413 4414 handler = logging.root.handlers[0] 4415 expected = logging.FileHandler('test.log', 'wb') 4416 self.assertEqual(handler.stream.mode, expected.stream.mode) 4417 self.addCleanup(cleanup, handler, expected, 'test.log') 4418 4419 def test_stream(self): 4420 stream = io.StringIO() 4421 self.addCleanup(stream.close) 4422 logging.basicConfig(stream=stream) 4423 4424 self.assertEqual(len(logging.root.handlers), 1) 4425 handler = logging.root.handlers[0] 4426 self.assertIsInstance(handler, logging.StreamHandler) 4427 self.assertEqual(handler.stream, stream) 4428 4429 def test_format(self): 4430 logging.basicConfig(format='%(asctime)s - %(message)s') 4431 4432 formatter = logging.root.handlers[0].formatter 4433 self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s') 4434 4435 def test_datefmt(self): 4436 logging.basicConfig(datefmt='bar') 4437 4438 formatter = logging.root.handlers[0].formatter 4439 self.assertEqual(formatter.datefmt, 'bar') 4440 4441 def test_style(self): 4442 logging.basicConfig(style='$') 4443 4444 formatter = logging.root.handlers[0].formatter 4445 self.assertIsInstance(formatter._style, logging.StringTemplateStyle) 4446 4447 def test_level(self): 4448 old_level = logging.root.level 4449 self.addCleanup(logging.root.setLevel, old_level) 4450 4451 logging.basicConfig(level=57) 4452 self.assertEqual(logging.root.level, 57) 4453 # Test that second call has no effect 4454 logging.basicConfig(level=58) 4455 self.assertEqual(logging.root.level, 57) 4456 4457 def test_incompatible(self): 4458 assertRaises = self.assertRaises 4459 handlers = [logging.StreamHandler()] 4460 stream = sys.stderr 4461 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4462 stream=stream) 4463 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4464 handlers=handlers) 4465 assertRaises(ValueError, logging.basicConfig, stream=stream, 4466 handlers=handlers) 4467 # Issue 23207: test for invalid kwargs 4468 assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) 4469 # Should pop both filename and filemode even if filename is None 4470 logging.basicConfig(filename=None, filemode='a') 4471 4472 def test_handlers(self): 4473 handlers = [ 4474 logging.StreamHandler(), 4475 logging.StreamHandler(sys.stdout), 4476 logging.StreamHandler(), 4477 ] 4478 f = logging.Formatter() 4479 handlers[2].setFormatter(f) 4480 logging.basicConfig(handlers=handlers) 4481 self.assertIs(handlers[0], logging.root.handlers[0]) 4482 self.assertIs(handlers[1], logging.root.handlers[1]) 4483 self.assertIs(handlers[2], logging.root.handlers[2]) 4484 self.assertIsNotNone(handlers[0].formatter) 4485 self.assertIsNotNone(handlers[1].formatter) 4486 self.assertIs(handlers[2].formatter, f) 4487 self.assertIs(handlers[0].formatter, handlers[1].formatter) 4488 4489 def test_force(self): 4490 old_string_io = io.StringIO() 4491 new_string_io = io.StringIO() 4492 old_handlers = [logging.StreamHandler(old_string_io)] 4493 new_handlers = [logging.StreamHandler(new_string_io)] 4494 logging.basicConfig(level=logging.WARNING, handlers=old_handlers) 4495 logging.warning('warn') 4496 logging.info('info') 4497 logging.debug('debug') 4498 self.assertEqual(len(logging.root.handlers), 1) 4499 logging.basicConfig(level=logging.INFO, handlers=new_handlers, 4500 force=True) 4501 logging.warning('warn') 4502 logging.info('info') 4503 logging.debug('debug') 4504 self.assertEqual(len(logging.root.handlers), 1) 4505 self.assertEqual(old_string_io.getvalue().strip(), 4506 'WARNING:root:warn') 4507 self.assertEqual(new_string_io.getvalue().strip(), 4508 'WARNING:root:warn\nINFO:root:info') 4509 4510 def _test_log(self, method, level=None): 4511 # logging.root has no handlers so basicConfig should be called 4512 called = [] 4513 4514 old_basic_config = logging.basicConfig 4515 def my_basic_config(*a, **kw): 4516 old_basic_config() 4517 old_level = logging.root.level 4518 logging.root.setLevel(100) # avoid having messages in stderr 4519 self.addCleanup(logging.root.setLevel, old_level) 4520 called.append((a, kw)) 4521 4522 support.patch(self, logging, 'basicConfig', my_basic_config) 4523 4524 log_method = getattr(logging, method) 4525 if level is not None: 4526 log_method(level, "test me") 4527 else: 4528 log_method("test me") 4529 4530 # basicConfig was called with no arguments 4531 self.assertEqual(called, [((), {})]) 4532 4533 def test_log(self): 4534 self._test_log('log', logging.WARNING) 4535 4536 def test_debug(self): 4537 self._test_log('debug') 4538 4539 def test_info(self): 4540 self._test_log('info') 4541 4542 def test_warning(self): 4543 self._test_log('warning') 4544 4545 def test_error(self): 4546 self._test_log('error') 4547 4548 def test_critical(self): 4549 self._test_log('critical') 4550 4551 4552class LoggerAdapterTest(unittest.TestCase): 4553 def setUp(self): 4554 super(LoggerAdapterTest, self).setUp() 4555 old_handler_list = logging._handlerList[:] 4556 4557 self.recording = RecordingHandler() 4558 self.logger = logging.root 4559 self.logger.addHandler(self.recording) 4560 self.addCleanup(self.logger.removeHandler, self.recording) 4561 self.addCleanup(self.recording.close) 4562 4563 def cleanup(): 4564 logging._handlerList[:] = old_handler_list 4565 4566 self.addCleanup(cleanup) 4567 self.addCleanup(logging.shutdown) 4568 self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None) 4569 4570 def test_exception(self): 4571 msg = 'testing exception: %r' 4572 exc = None 4573 try: 4574 1 / 0 4575 except ZeroDivisionError as e: 4576 exc = e 4577 self.adapter.exception(msg, self.recording) 4578 4579 self.assertEqual(len(self.recording.records), 1) 4580 record = self.recording.records[0] 4581 self.assertEqual(record.levelno, logging.ERROR) 4582 self.assertEqual(record.msg, msg) 4583 self.assertEqual(record.args, (self.recording,)) 4584 self.assertEqual(record.exc_info, 4585 (exc.__class__, exc, exc.__traceback__)) 4586 4587 def test_exception_excinfo(self): 4588 try: 4589 1 / 0 4590 except ZeroDivisionError as e: 4591 exc = e 4592 4593 self.adapter.exception('exc_info test', exc_info=exc) 4594 4595 self.assertEqual(len(self.recording.records), 1) 4596 record = self.recording.records[0] 4597 self.assertEqual(record.exc_info, 4598 (exc.__class__, exc, exc.__traceback__)) 4599 4600 def test_critical(self): 4601 msg = 'critical test! %r' 4602 self.adapter.critical(msg, self.recording) 4603 4604 self.assertEqual(len(self.recording.records), 1) 4605 record = self.recording.records[0] 4606 self.assertEqual(record.levelno, logging.CRITICAL) 4607 self.assertEqual(record.msg, msg) 4608 self.assertEqual(record.args, (self.recording,)) 4609 4610 def test_is_enabled_for(self): 4611 old_disable = self.adapter.logger.manager.disable 4612 self.adapter.logger.manager.disable = 33 4613 self.addCleanup(setattr, self.adapter.logger.manager, 'disable', 4614 old_disable) 4615 self.assertFalse(self.adapter.isEnabledFor(32)) 4616 4617 def test_has_handlers(self): 4618 self.assertTrue(self.adapter.hasHandlers()) 4619 4620 for handler in self.logger.handlers: 4621 self.logger.removeHandler(handler) 4622 4623 self.assertFalse(self.logger.hasHandlers()) 4624 self.assertFalse(self.adapter.hasHandlers()) 4625 4626 def test_nested(self): 4627 class Adapter(logging.LoggerAdapter): 4628 prefix = 'Adapter' 4629 4630 def process(self, msg, kwargs): 4631 return f"{self.prefix} {msg}", kwargs 4632 4633 msg = 'Adapters can be nested, yo.' 4634 adapter = Adapter(logger=self.logger, extra=None) 4635 adapter_adapter = Adapter(logger=adapter, extra=None) 4636 adapter_adapter.prefix = 'AdapterAdapter' 4637 self.assertEqual(repr(adapter), repr(adapter_adapter)) 4638 adapter_adapter.log(logging.CRITICAL, msg, self.recording) 4639 self.assertEqual(len(self.recording.records), 1) 4640 record = self.recording.records[0] 4641 self.assertEqual(record.levelno, logging.CRITICAL) 4642 self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}") 4643 self.assertEqual(record.args, (self.recording,)) 4644 orig_manager = adapter_adapter.manager 4645 self.assertIs(adapter.manager, orig_manager) 4646 self.assertIs(self.logger.manager, orig_manager) 4647 temp_manager = object() 4648 try: 4649 adapter_adapter.manager = temp_manager 4650 self.assertIs(adapter_adapter.manager, temp_manager) 4651 self.assertIs(adapter.manager, temp_manager) 4652 self.assertIs(self.logger.manager, temp_manager) 4653 finally: 4654 adapter_adapter.manager = orig_manager 4655 self.assertIs(adapter_adapter.manager, orig_manager) 4656 self.assertIs(adapter.manager, orig_manager) 4657 self.assertIs(self.logger.manager, orig_manager) 4658 4659 4660class LoggerTest(BaseTest): 4661 4662 def setUp(self): 4663 super(LoggerTest, self).setUp() 4664 self.recording = RecordingHandler() 4665 self.logger = logging.Logger(name='blah') 4666 self.logger.addHandler(self.recording) 4667 self.addCleanup(self.logger.removeHandler, self.recording) 4668 self.addCleanup(self.recording.close) 4669 self.addCleanup(logging.shutdown) 4670 4671 def test_set_invalid_level(self): 4672 self.assertRaises(TypeError, self.logger.setLevel, object()) 4673 4674 def test_exception(self): 4675 msg = 'testing exception: %r' 4676 exc = None 4677 try: 4678 1 / 0 4679 except ZeroDivisionError as e: 4680 exc = e 4681 self.logger.exception(msg, self.recording) 4682 4683 self.assertEqual(len(self.recording.records), 1) 4684 record = self.recording.records[0] 4685 self.assertEqual(record.levelno, logging.ERROR) 4686 self.assertEqual(record.msg, msg) 4687 self.assertEqual(record.args, (self.recording,)) 4688 self.assertEqual(record.exc_info, 4689 (exc.__class__, exc, exc.__traceback__)) 4690 4691 def test_log_invalid_level_with_raise(self): 4692 with support.swap_attr(logging, 'raiseExceptions', True): 4693 self.assertRaises(TypeError, self.logger.log, '10', 'test message') 4694 4695 def test_log_invalid_level_no_raise(self): 4696 with support.swap_attr(logging, 'raiseExceptions', False): 4697 self.logger.log('10', 'test message') # no exception happens 4698 4699 def test_find_caller_with_stack_info(self): 4700 called = [] 4701 support.patch(self, logging.traceback, 'print_stack', 4702 lambda f, file: called.append(file.getvalue())) 4703 4704 self.logger.findCaller(stack_info=True) 4705 4706 self.assertEqual(len(called), 1) 4707 self.assertEqual('Stack (most recent call last):\n', called[0]) 4708 4709 def test_find_caller_with_stacklevel(self): 4710 the_level = 1 4711 4712 def innermost(): 4713 self.logger.warning('test', stacklevel=the_level) 4714 4715 def inner(): 4716 innermost() 4717 4718 def outer(): 4719 inner() 4720 4721 records = self.recording.records 4722 outer() 4723 self.assertEqual(records[-1].funcName, 'innermost') 4724 lineno = records[-1].lineno 4725 the_level += 1 4726 outer() 4727 self.assertEqual(records[-1].funcName, 'inner') 4728 self.assertGreater(records[-1].lineno, lineno) 4729 lineno = records[-1].lineno 4730 the_level += 1 4731 outer() 4732 self.assertEqual(records[-1].funcName, 'outer') 4733 self.assertGreater(records[-1].lineno, lineno) 4734 lineno = records[-1].lineno 4735 the_level += 1 4736 outer() 4737 self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel') 4738 self.assertGreater(records[-1].lineno, lineno) 4739 4740 def test_make_record_with_extra_overwrite(self): 4741 name = 'my record' 4742 level = 13 4743 fn = lno = msg = args = exc_info = func = sinfo = None 4744 rv = logging._logRecordFactory(name, level, fn, lno, msg, args, 4745 exc_info, func, sinfo) 4746 4747 for key in ('message', 'asctime') + tuple(rv.__dict__.keys()): 4748 extra = {key: 'some value'} 4749 self.assertRaises(KeyError, self.logger.makeRecord, name, level, 4750 fn, lno, msg, args, exc_info, 4751 extra=extra, sinfo=sinfo) 4752 4753 def test_make_record_with_extra_no_overwrite(self): 4754 name = 'my record' 4755 level = 13 4756 fn = lno = msg = args = exc_info = func = sinfo = None 4757 extra = {'valid_key': 'some value'} 4758 result = self.logger.makeRecord(name, level, fn, lno, msg, args, 4759 exc_info, extra=extra, sinfo=sinfo) 4760 self.assertIn('valid_key', result.__dict__) 4761 4762 def test_has_handlers(self): 4763 self.assertTrue(self.logger.hasHandlers()) 4764 4765 for handler in self.logger.handlers: 4766 self.logger.removeHandler(handler) 4767 self.assertFalse(self.logger.hasHandlers()) 4768 4769 def test_has_handlers_no_propagate(self): 4770 child_logger = logging.getLogger('blah.child') 4771 child_logger.propagate = False 4772 self.assertFalse(child_logger.hasHandlers()) 4773 4774 def test_is_enabled_for(self): 4775 old_disable = self.logger.manager.disable 4776 self.logger.manager.disable = 23 4777 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 4778 self.assertFalse(self.logger.isEnabledFor(22)) 4779 4780 def test_is_enabled_for_disabled_logger(self): 4781 old_disabled = self.logger.disabled 4782 old_disable = self.logger.manager.disable 4783 4784 self.logger.disabled = True 4785 self.logger.manager.disable = 21 4786 4787 self.addCleanup(setattr, self.logger, 'disabled', old_disabled) 4788 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 4789 4790 self.assertFalse(self.logger.isEnabledFor(22)) 4791 4792 def test_root_logger_aliases(self): 4793 root = logging.getLogger() 4794 self.assertIs(root, logging.root) 4795 self.assertIs(root, logging.getLogger(None)) 4796 self.assertIs(root, logging.getLogger('')) 4797 self.assertIs(root, logging.getLogger('foo').root) 4798 self.assertIs(root, logging.getLogger('foo.bar').root) 4799 self.assertIs(root, logging.getLogger('foo').parent) 4800 4801 self.assertIsNot(root, logging.getLogger('\0')) 4802 self.assertIsNot(root, logging.getLogger('foo.bar').parent) 4803 4804 def test_invalid_names(self): 4805 self.assertRaises(TypeError, logging.getLogger, any) 4806 self.assertRaises(TypeError, logging.getLogger, b'foo') 4807 4808 def test_pickling(self): 4809 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4810 for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'): 4811 logger = logging.getLogger(name) 4812 s = pickle.dumps(logger, proto) 4813 unpickled = pickle.loads(s) 4814 self.assertIs(unpickled, logger) 4815 4816 def test_caching(self): 4817 root = self.root_logger 4818 logger1 = logging.getLogger("abc") 4819 logger2 = logging.getLogger("abc.def") 4820 4821 # Set root logger level and ensure cache is empty 4822 root.setLevel(logging.ERROR) 4823 self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR) 4824 self.assertEqual(logger2._cache, {}) 4825 4826 # Ensure cache is populated and calls are consistent 4827 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 4828 self.assertFalse(logger2.isEnabledFor(logging.DEBUG)) 4829 self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False}) 4830 self.assertEqual(root._cache, {}) 4831 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 4832 4833 # Ensure root cache gets populated 4834 self.assertEqual(root._cache, {}) 4835 self.assertTrue(root.isEnabledFor(logging.ERROR)) 4836 self.assertEqual(root._cache, {logging.ERROR: True}) 4837 4838 # Set parent logger level and ensure caches are emptied 4839 logger1.setLevel(logging.CRITICAL) 4840 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 4841 self.assertEqual(logger2._cache, {}) 4842 4843 # Ensure logger2 uses parent logger's effective level 4844 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 4845 4846 # Set level to NOTSET and ensure caches are empty 4847 logger2.setLevel(logging.NOTSET) 4848 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 4849 self.assertEqual(logger2._cache, {}) 4850 self.assertEqual(logger1._cache, {}) 4851 self.assertEqual(root._cache, {}) 4852 4853 # Verify logger2 follows parent and not root 4854 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 4855 self.assertTrue(logger2.isEnabledFor(logging.CRITICAL)) 4856 self.assertFalse(logger1.isEnabledFor(logging.ERROR)) 4857 self.assertTrue(logger1.isEnabledFor(logging.CRITICAL)) 4858 self.assertTrue(root.isEnabledFor(logging.ERROR)) 4859 4860 # Disable logging in manager and ensure caches are clear 4861 logging.disable() 4862 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 4863 self.assertEqual(logger2._cache, {}) 4864 self.assertEqual(logger1._cache, {}) 4865 self.assertEqual(root._cache, {}) 4866 4867 # Ensure no loggers are enabled 4868 self.assertFalse(logger1.isEnabledFor(logging.CRITICAL)) 4869 self.assertFalse(logger2.isEnabledFor(logging.CRITICAL)) 4870 self.assertFalse(root.isEnabledFor(logging.CRITICAL)) 4871 4872 4873class BaseFileTest(BaseTest): 4874 "Base class for handler tests that write log files" 4875 4876 def setUp(self): 4877 BaseTest.setUp(self) 4878 fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") 4879 os.close(fd) 4880 self.rmfiles = [] 4881 4882 def tearDown(self): 4883 for fn in self.rmfiles: 4884 os.unlink(fn) 4885 if os.path.exists(self.fn): 4886 os.unlink(self.fn) 4887 BaseTest.tearDown(self) 4888 4889 def assertLogFile(self, filename): 4890 "Assert a log file is there and register it for deletion" 4891 self.assertTrue(os.path.exists(filename), 4892 msg="Log file %r does not exist" % filename) 4893 self.rmfiles.append(filename) 4894 4895 4896class FileHandlerTest(BaseFileTest): 4897 def test_delay(self): 4898 os.unlink(self.fn) 4899 fh = logging.FileHandler(self.fn, delay=True) 4900 self.assertIsNone(fh.stream) 4901 self.assertFalse(os.path.exists(self.fn)) 4902 fh.handle(logging.makeLogRecord({})) 4903 self.assertIsNotNone(fh.stream) 4904 self.assertTrue(os.path.exists(self.fn)) 4905 fh.close() 4906 4907class RotatingFileHandlerTest(BaseFileTest): 4908 def next_rec(self): 4909 return logging.LogRecord('n', logging.DEBUG, 'p', 1, 4910 self.next_message(), None, None, None) 4911 4912 def test_should_not_rollover(self): 4913 # If maxbytes is zero rollover never occurs 4914 rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0) 4915 self.assertFalse(rh.shouldRollover(None)) 4916 rh.close() 4917 4918 def test_should_rollover(self): 4919 rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1) 4920 self.assertTrue(rh.shouldRollover(self.next_rec())) 4921 rh.close() 4922 4923 def test_file_created(self): 4924 # checks that the file is created and assumes it was created 4925 # by us 4926 rh = logging.handlers.RotatingFileHandler(self.fn) 4927 rh.emit(self.next_rec()) 4928 self.assertLogFile(self.fn) 4929 rh.close() 4930 4931 def test_rollover_filenames(self): 4932 def namer(name): 4933 return name + ".test" 4934 rh = logging.handlers.RotatingFileHandler( 4935 self.fn, backupCount=2, maxBytes=1) 4936 rh.namer = namer 4937 rh.emit(self.next_rec()) 4938 self.assertLogFile(self.fn) 4939 rh.emit(self.next_rec()) 4940 self.assertLogFile(namer(self.fn + ".1")) 4941 rh.emit(self.next_rec()) 4942 self.assertLogFile(namer(self.fn + ".2")) 4943 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 4944 rh.close() 4945 4946 @support.requires_zlib 4947 def test_rotator(self): 4948 def namer(name): 4949 return name + ".gz" 4950 4951 def rotator(source, dest): 4952 with open(source, "rb") as sf: 4953 data = sf.read() 4954 compressed = zlib.compress(data, 9) 4955 with open(dest, "wb") as df: 4956 df.write(compressed) 4957 os.remove(source) 4958 4959 rh = logging.handlers.RotatingFileHandler( 4960 self.fn, backupCount=2, maxBytes=1) 4961 rh.rotator = rotator 4962 rh.namer = namer 4963 m1 = self.next_rec() 4964 rh.emit(m1) 4965 self.assertLogFile(self.fn) 4966 m2 = self.next_rec() 4967 rh.emit(m2) 4968 fn = namer(self.fn + ".1") 4969 self.assertLogFile(fn) 4970 newline = os.linesep 4971 with open(fn, "rb") as f: 4972 compressed = f.read() 4973 data = zlib.decompress(compressed) 4974 self.assertEqual(data.decode("ascii"), m1.msg + newline) 4975 rh.emit(self.next_rec()) 4976 fn = namer(self.fn + ".2") 4977 self.assertLogFile(fn) 4978 with open(fn, "rb") as f: 4979 compressed = f.read() 4980 data = zlib.decompress(compressed) 4981 self.assertEqual(data.decode("ascii"), m1.msg + newline) 4982 rh.emit(self.next_rec()) 4983 fn = namer(self.fn + ".2") 4984 with open(fn, "rb") as f: 4985 compressed = f.read() 4986 data = zlib.decompress(compressed) 4987 self.assertEqual(data.decode("ascii"), m2.msg + newline) 4988 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 4989 rh.close() 4990 4991class TimedRotatingFileHandlerTest(BaseFileTest): 4992 # other test methods added below 4993 def test_rollover(self): 4994 fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S', 4995 backupCount=1) 4996 fmt = logging.Formatter('%(asctime)s %(message)s') 4997 fh.setFormatter(fmt) 4998 r1 = logging.makeLogRecord({'msg': 'testing - initial'}) 4999 fh.emit(r1) 5000 self.assertLogFile(self.fn) 5001 time.sleep(1.1) # a little over a second ... 5002 r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) 5003 fh.emit(r2) 5004 fh.close() 5005 # At this point, we should have a recent rotated file which we 5006 # can test for the existence of. However, in practice, on some 5007 # machines which run really slowly, we don't know how far back 5008 # in time to go to look for the log file. So, we go back a fair 5009 # bit, and stop as soon as we see a rotated file. In theory this 5010 # could of course still fail, but the chances are lower. 5011 found = False 5012 now = datetime.datetime.now() 5013 GO_BACK = 5 * 60 # seconds 5014 for secs in range(GO_BACK): 5015 prev = now - datetime.timedelta(seconds=secs) 5016 fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") 5017 found = os.path.exists(fn) 5018 if found: 5019 self.rmfiles.append(fn) 5020 break 5021 msg = 'No rotated files found, went back %d seconds' % GO_BACK 5022 if not found: 5023 # print additional diagnostics 5024 dn, fn = os.path.split(self.fn) 5025 files = [f for f in os.listdir(dn) if f.startswith(fn)] 5026 print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) 5027 print('The only matching files are: %s' % files, file=sys.stderr) 5028 for f in files: 5029 print('Contents of %s:' % f) 5030 path = os.path.join(dn, f) 5031 with open(path, 'r') as tf: 5032 print(tf.read()) 5033 self.assertTrue(found, msg=msg) 5034 5035 def test_invalid(self): 5036 assertRaises = self.assertRaises 5037 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5038 self.fn, 'X', delay=True) 5039 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5040 self.fn, 'W', delay=True) 5041 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5042 self.fn, 'W7', delay=True) 5043 5044 def test_compute_rollover_daily_attime(self): 5045 currentTime = 0 5046 atTime = datetime.time(12, 0, 0) 5047 rh = logging.handlers.TimedRotatingFileHandler( 5048 self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True, 5049 atTime=atTime) 5050 try: 5051 actual = rh.computeRollover(currentTime) 5052 self.assertEqual(actual, currentTime + 12 * 60 * 60) 5053 5054 actual = rh.computeRollover(currentTime + 13 * 60 * 60) 5055 self.assertEqual(actual, currentTime + 36 * 60 * 60) 5056 finally: 5057 rh.close() 5058 5059 #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.') 5060 def test_compute_rollover_weekly_attime(self): 5061 currentTime = int(time.time()) 5062 today = currentTime - currentTime % 86400 5063 5064 atTime = datetime.time(12, 0, 0) 5065 5066 wday = time.gmtime(today).tm_wday 5067 for day in range(7): 5068 rh = logging.handlers.TimedRotatingFileHandler( 5069 self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True, 5070 atTime=atTime) 5071 try: 5072 if wday > day: 5073 # The rollover day has already passed this week, so we 5074 # go over into next week 5075 expected = (7 - wday + day) 5076 else: 5077 expected = (day - wday) 5078 # At this point expected is in days from now, convert to seconds 5079 expected *= 24 * 60 * 60 5080 # Add in the rollover time 5081 expected += 12 * 60 * 60 5082 # Add in adjustment for today 5083 expected += today 5084 actual = rh.computeRollover(today) 5085 if actual != expected: 5086 print('failed in timezone: %d' % time.timezone) 5087 print('local vars: %s' % locals()) 5088 self.assertEqual(actual, expected) 5089 if day == wday: 5090 # goes into following week 5091 expected += 7 * 24 * 60 * 60 5092 actual = rh.computeRollover(today + 13 * 60 * 60) 5093 if actual != expected: 5094 print('failed in timezone: %d' % time.timezone) 5095 print('local vars: %s' % locals()) 5096 self.assertEqual(actual, expected) 5097 finally: 5098 rh.close() 5099 5100 5101def secs(**kw): 5102 return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) 5103 5104for when, exp in (('S', 1), 5105 ('M', 60), 5106 ('H', 60 * 60), 5107 ('D', 60 * 60 * 24), 5108 ('MIDNIGHT', 60 * 60 * 24), 5109 # current time (epoch start) is a Thursday, W0 means Monday 5110 ('W0', secs(days=4, hours=24)), 5111 ): 5112 def test_compute_rollover(self, when=when, exp=exp): 5113 rh = logging.handlers.TimedRotatingFileHandler( 5114 self.fn, when=when, interval=1, backupCount=0, utc=True) 5115 currentTime = 0.0 5116 actual = rh.computeRollover(currentTime) 5117 if exp != actual: 5118 # Failures occur on some systems for MIDNIGHT and W0. 5119 # Print detailed calculation for MIDNIGHT so we can try to see 5120 # what's going on 5121 if when == 'MIDNIGHT': 5122 try: 5123 if rh.utc: 5124 t = time.gmtime(currentTime) 5125 else: 5126 t = time.localtime(currentTime) 5127 currentHour = t[3] 5128 currentMinute = t[4] 5129 currentSecond = t[5] 5130 # r is the number of seconds left between now and midnight 5131 r = logging.handlers._MIDNIGHT - ((currentHour * 60 + 5132 currentMinute) * 60 + 5133 currentSecond) 5134 result = currentTime + r 5135 print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) 5136 print('currentHour: %s' % currentHour, file=sys.stderr) 5137 print('currentMinute: %s' % currentMinute, file=sys.stderr) 5138 print('currentSecond: %s' % currentSecond, file=sys.stderr) 5139 print('r: %s' % r, file=sys.stderr) 5140 print('result: %s' % result, file=sys.stderr) 5141 except Exception: 5142 print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr) 5143 self.assertEqual(exp, actual) 5144 rh.close() 5145 setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) 5146 5147 5148@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.') 5149class NTEventLogHandlerTest(BaseTest): 5150 def test_basic(self): 5151 logtype = 'Application' 5152 elh = win32evtlog.OpenEventLog(None, logtype) 5153 num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) 5154 5155 try: 5156 h = logging.handlers.NTEventLogHandler('test_logging') 5157 except pywintypes.error as e: 5158 if e.winerror == 5: # access denied 5159 raise unittest.SkipTest('Insufficient privileges to run test') 5160 raise 5161 5162 r = logging.makeLogRecord({'msg': 'Test Log Message'}) 5163 h.handle(r) 5164 h.close() 5165 # Now see if the event is recorded 5166 self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) 5167 flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ 5168 win32evtlog.EVENTLOG_SEQUENTIAL_READ 5169 found = False 5170 GO_BACK = 100 5171 events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) 5172 for e in events: 5173 if e.SourceName != 'test_logging': 5174 continue 5175 msg = win32evtlogutil.SafeFormatMessage(e, logtype) 5176 if msg != 'Test Log Message\r\n': 5177 continue 5178 found = True 5179 break 5180 msg = 'Record not found in event log, went back %d records' % GO_BACK 5181 self.assertTrue(found, msg=msg) 5182 5183 5184class MiscTestCase(unittest.TestCase): 5185 def test__all__(self): 5186 blacklist = {'logThreads', 'logMultiprocessing', 5187 'logProcesses', 'currentframe', 5188 'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle', 5189 'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 5190 'root', 'threading'} 5191 support.check__all__(self, logging, blacklist=blacklist) 5192 5193 5194# Set the locale to the platform-dependent default. I have no idea 5195# why the test does this, but in any case we save the current locale 5196# first and restore it at the end. 5197@support.run_with_locale('LC_ALL', '') 5198def test_main(): 5199 tests = [ 5200 BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest, 5201 HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest, 5202 DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest, 5203 ConfigDictTest, ManagerTest, FormatterTest, BufferingFormatterTest, 5204 StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest, 5205 QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest, 5206 LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest, 5207 RotatingFileHandlerTest, LastResortTest, LogRecordTest, 5208 ExceptionTest, SysLogHandlerTest, IPv6SysLogHandlerTest, HTTPHandlerTest, 5209 NTEventLogHandlerTest, TimedRotatingFileHandlerTest, 5210 UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest, 5211 MiscTestCase 5212 ] 5213 if hasattr(logging.handlers, 'QueueListener'): 5214 tests.append(QueueListenerTest) 5215 support.run_unittest(*tests) 5216 5217if __name__ == "__main__": 5218 test_main() 5219