1# Copyright 2001-2017 by Vinay Sajip. All Rights Reserved. 2# 3# Permission to use, copy, modify, and distribute this software and its 4# documentation for any purpose and without fee is hereby granted, 5# provided that the above copyright notice appear in all copies and that 6# both that copyright notice and this permission notice appear in 7# supporting documentation, and that the name of Vinay Sajip 8# not be used in advertising or publicity pertaining to distribution 9# of the software without specific, written prior permission. 10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING 11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR 13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER 14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 17"""Test harness for the logging module. Run all tests. 18 19Copyright (C) 2001-2017 Vinay Sajip. All Rights Reserved. 20""" 21 22import logging 23import logging.handlers 24import logging.config 25 26import codecs 27import configparser 28import copy 29import datetime 30import pathlib 31import pickle 32import io 33import gc 34import json 35import os 36import queue 37import random 38import re 39import signal 40import socket 41import struct 42import sys 43import tempfile 44from test.support.script_helper import assert_python_ok, assert_python_failure 45from test import support 46import textwrap 47import threading 48import time 49import unittest 50import warnings 51import weakref 52 53import asyncore 54from http.server import HTTPServer, BaseHTTPRequestHandler 55import smtpd 56from urllib.parse import urlparse, parse_qs 57from socketserver import (ThreadingUDPServer, DatagramRequestHandler, 58 ThreadingTCPServer, StreamRequestHandler) 59 60try: 61 import win32evtlog, win32evtlogutil, pywintypes 62except ImportError: 63 win32evtlog = win32evtlogutil = pywintypes = None 64 65try: 66 import zlib 67except ImportError: 68 pass 69 70class BaseTest(unittest.TestCase): 71 72 """Base class for logging tests.""" 73 74 log_format = "%(name)s -> %(levelname)s: %(message)s" 75 expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" 76 message_num = 0 77 78 def setUp(self): 79 """Setup the default logging stream to an internal StringIO instance, 80 so that we can examine log output as we want.""" 81 self._threading_key = support.threading_setup() 82 83 logger_dict = logging.getLogger().manager.loggerDict 84 logging._acquireLock() 85 try: 86 self.saved_handlers = logging._handlers.copy() 87 self.saved_handler_list = logging._handlerList[:] 88 self.saved_loggers = saved_loggers = logger_dict.copy() 89 self.saved_name_to_level = logging._nameToLevel.copy() 90 self.saved_level_to_name = logging._levelToName.copy() 91 self.logger_states = logger_states = {} 92 for name in saved_loggers: 93 logger_states[name] = getattr(saved_loggers[name], 94 'disabled', None) 95 finally: 96 logging._releaseLock() 97 98 # Set two unused loggers 99 self.logger1 = logging.getLogger("\xab\xd7\xbb") 100 self.logger2 = logging.getLogger("\u013f\u00d6\u0047") 101 102 self.root_logger = logging.getLogger("") 103 self.original_logging_level = self.root_logger.getEffectiveLevel() 104 105 self.stream = io.StringIO() 106 self.root_logger.setLevel(logging.DEBUG) 107 self.root_hdlr = logging.StreamHandler(self.stream) 108 self.root_formatter = logging.Formatter(self.log_format) 109 self.root_hdlr.setFormatter(self.root_formatter) 110 if self.logger1.hasHandlers(): 111 hlist = self.logger1.handlers + self.root_logger.handlers 112 raise AssertionError('Unexpected handlers: %s' % hlist) 113 if self.logger2.hasHandlers(): 114 hlist = self.logger2.handlers + self.root_logger.handlers 115 raise AssertionError('Unexpected handlers: %s' % hlist) 116 self.root_logger.addHandler(self.root_hdlr) 117 self.assertTrue(self.logger1.hasHandlers()) 118 self.assertTrue(self.logger2.hasHandlers()) 119 120 def tearDown(self): 121 """Remove our logging stream, and restore the original logging 122 level.""" 123 self.stream.close() 124 self.root_logger.removeHandler(self.root_hdlr) 125 while self.root_logger.handlers: 126 h = self.root_logger.handlers[0] 127 self.root_logger.removeHandler(h) 128 h.close() 129 self.root_logger.setLevel(self.original_logging_level) 130 logging._acquireLock() 131 try: 132 logging._levelToName.clear() 133 logging._levelToName.update(self.saved_level_to_name) 134 logging._nameToLevel.clear() 135 logging._nameToLevel.update(self.saved_name_to_level) 136 logging._handlers.clear() 137 logging._handlers.update(self.saved_handlers) 138 logging._handlerList[:] = self.saved_handler_list 139 manager = logging.getLogger().manager 140 manager.disable = 0 141 loggerDict = manager.loggerDict 142 loggerDict.clear() 143 loggerDict.update(self.saved_loggers) 144 logger_states = self.logger_states 145 for name in self.logger_states: 146 if logger_states[name] is not None: 147 self.saved_loggers[name].disabled = logger_states[name] 148 finally: 149 logging._releaseLock() 150 151 self.doCleanups() 152 support.threading_cleanup(*self._threading_key) 153 154 def assert_log_lines(self, expected_values, stream=None, pat=None): 155 """Match the collected log lines against the regular expression 156 self.expected_log_pat, and compare the extracted group values to 157 the expected_values list of tuples.""" 158 stream = stream or self.stream 159 pat = re.compile(pat or self.expected_log_pat) 160 actual_lines = stream.getvalue().splitlines() 161 self.assertEqual(len(actual_lines), len(expected_values)) 162 for actual, expected in zip(actual_lines, expected_values): 163 match = pat.search(actual) 164 if not match: 165 self.fail("Log line does not match expected pattern:\n" + 166 actual) 167 self.assertEqual(tuple(match.groups()), expected) 168 s = stream.read() 169 if s: 170 self.fail("Remaining output at end of log stream:\n" + s) 171 172 def next_message(self): 173 """Generate a message consisting solely of an auto-incrementing 174 integer.""" 175 self.message_num += 1 176 return "%d" % self.message_num 177 178 179class BuiltinLevelsTest(BaseTest): 180 """Test builtin levels and their inheritance.""" 181 182 def test_flat(self): 183 # Logging levels in a flat logger namespace. 184 m = self.next_message 185 186 ERR = logging.getLogger("ERR") 187 ERR.setLevel(logging.ERROR) 188 INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) 189 INF.setLevel(logging.INFO) 190 DEB = logging.getLogger("DEB") 191 DEB.setLevel(logging.DEBUG) 192 193 # These should log. 194 ERR.log(logging.CRITICAL, m()) 195 ERR.error(m()) 196 197 INF.log(logging.CRITICAL, m()) 198 INF.error(m()) 199 INF.warning(m()) 200 INF.info(m()) 201 202 DEB.log(logging.CRITICAL, m()) 203 DEB.error(m()) 204 DEB.warning(m()) 205 DEB.info(m()) 206 DEB.debug(m()) 207 208 # These should not log. 209 ERR.warning(m()) 210 ERR.info(m()) 211 ERR.debug(m()) 212 213 INF.debug(m()) 214 215 self.assert_log_lines([ 216 ('ERR', 'CRITICAL', '1'), 217 ('ERR', 'ERROR', '2'), 218 ('INF', 'CRITICAL', '3'), 219 ('INF', 'ERROR', '4'), 220 ('INF', 'WARNING', '5'), 221 ('INF', 'INFO', '6'), 222 ('DEB', 'CRITICAL', '7'), 223 ('DEB', 'ERROR', '8'), 224 ('DEB', 'WARNING', '9'), 225 ('DEB', 'INFO', '10'), 226 ('DEB', 'DEBUG', '11'), 227 ]) 228 229 def test_nested_explicit(self): 230 # Logging levels in a nested namespace, all explicitly set. 231 m = self.next_message 232 233 INF = logging.getLogger("INF") 234 INF.setLevel(logging.INFO) 235 INF_ERR = logging.getLogger("INF.ERR") 236 INF_ERR.setLevel(logging.ERROR) 237 238 # These should log. 239 INF_ERR.log(logging.CRITICAL, m()) 240 INF_ERR.error(m()) 241 242 # These should not log. 243 INF_ERR.warning(m()) 244 INF_ERR.info(m()) 245 INF_ERR.debug(m()) 246 247 self.assert_log_lines([ 248 ('INF.ERR', 'CRITICAL', '1'), 249 ('INF.ERR', 'ERROR', '2'), 250 ]) 251 252 def test_nested_inherited(self): 253 # Logging levels in a nested namespace, inherited from parent loggers. 254 m = self.next_message 255 256 INF = logging.getLogger("INF") 257 INF.setLevel(logging.INFO) 258 INF_ERR = logging.getLogger("INF.ERR") 259 INF_ERR.setLevel(logging.ERROR) 260 INF_UNDEF = logging.getLogger("INF.UNDEF") 261 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") 262 UNDEF = logging.getLogger("UNDEF") 263 264 # These should log. 265 INF_UNDEF.log(logging.CRITICAL, m()) 266 INF_UNDEF.error(m()) 267 INF_UNDEF.warning(m()) 268 INF_UNDEF.info(m()) 269 INF_ERR_UNDEF.log(logging.CRITICAL, m()) 270 INF_ERR_UNDEF.error(m()) 271 272 # These should not log. 273 INF_UNDEF.debug(m()) 274 INF_ERR_UNDEF.warning(m()) 275 INF_ERR_UNDEF.info(m()) 276 INF_ERR_UNDEF.debug(m()) 277 278 self.assert_log_lines([ 279 ('INF.UNDEF', 'CRITICAL', '1'), 280 ('INF.UNDEF', 'ERROR', '2'), 281 ('INF.UNDEF', 'WARNING', '3'), 282 ('INF.UNDEF', 'INFO', '4'), 283 ('INF.ERR.UNDEF', 'CRITICAL', '5'), 284 ('INF.ERR.UNDEF', 'ERROR', '6'), 285 ]) 286 287 def test_nested_with_virtual_parent(self): 288 # Logging levels when some parent does not exist yet. 289 m = self.next_message 290 291 INF = logging.getLogger("INF") 292 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") 293 CHILD = logging.getLogger("INF.BADPARENT") 294 INF.setLevel(logging.INFO) 295 296 # These should log. 297 GRANDCHILD.log(logging.FATAL, m()) 298 GRANDCHILD.info(m()) 299 CHILD.log(logging.FATAL, m()) 300 CHILD.info(m()) 301 302 # These should not log. 303 GRANDCHILD.debug(m()) 304 CHILD.debug(m()) 305 306 self.assert_log_lines([ 307 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), 308 ('INF.BADPARENT.UNDEF', 'INFO', '2'), 309 ('INF.BADPARENT', 'CRITICAL', '3'), 310 ('INF.BADPARENT', 'INFO', '4'), 311 ]) 312 313 def test_regression_22386(self): 314 """See issue #22386 for more information.""" 315 self.assertEqual(logging.getLevelName('INFO'), logging.INFO) 316 self.assertEqual(logging.getLevelName(logging.INFO), 'INFO') 317 318 def test_issue27935(self): 319 fatal = logging.getLevelName('FATAL') 320 self.assertEqual(fatal, logging.FATAL) 321 322 def test_regression_29220(self): 323 """See issue #29220 for more information.""" 324 logging.addLevelName(logging.INFO, '') 325 self.addCleanup(logging.addLevelName, logging.INFO, 'INFO') 326 self.assertEqual(logging.getLevelName(logging.INFO), '') 327 self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET') 328 self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET) 329 330class BasicFilterTest(BaseTest): 331 332 """Test the bundled Filter class.""" 333 334 def test_filter(self): 335 # Only messages satisfying the specified criteria pass through the 336 # filter. 337 filter_ = logging.Filter("spam.eggs") 338 handler = self.root_logger.handlers[0] 339 try: 340 handler.addFilter(filter_) 341 spam = logging.getLogger("spam") 342 spam_eggs = logging.getLogger("spam.eggs") 343 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 344 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 345 346 spam.info(self.next_message()) 347 spam_eggs.info(self.next_message()) # Good. 348 spam_eggs_fish.info(self.next_message()) # Good. 349 spam_bakedbeans.info(self.next_message()) 350 351 self.assert_log_lines([ 352 ('spam.eggs', 'INFO', '2'), 353 ('spam.eggs.fish', 'INFO', '3'), 354 ]) 355 finally: 356 handler.removeFilter(filter_) 357 358 def test_callable_filter(self): 359 # Only messages satisfying the specified criteria pass through the 360 # filter. 361 362 def filterfunc(record): 363 parts = record.name.split('.') 364 prefix = '.'.join(parts[:2]) 365 return prefix == 'spam.eggs' 366 367 handler = self.root_logger.handlers[0] 368 try: 369 handler.addFilter(filterfunc) 370 spam = logging.getLogger("spam") 371 spam_eggs = logging.getLogger("spam.eggs") 372 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 373 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 374 375 spam.info(self.next_message()) 376 spam_eggs.info(self.next_message()) # Good. 377 spam_eggs_fish.info(self.next_message()) # Good. 378 spam_bakedbeans.info(self.next_message()) 379 380 self.assert_log_lines([ 381 ('spam.eggs', 'INFO', '2'), 382 ('spam.eggs.fish', 'INFO', '3'), 383 ]) 384 finally: 385 handler.removeFilter(filterfunc) 386 387 def test_empty_filter(self): 388 f = logging.Filter() 389 r = logging.makeLogRecord({'name': 'spam.eggs'}) 390 self.assertTrue(f.filter(r)) 391 392# 393# First, we define our levels. There can be as many as you want - the only 394# limitations are that they should be integers, the lowest should be > 0 and 395# larger values mean less information being logged. If you need specific 396# level values which do not fit into these limitations, you can use a 397# mapping dictionary to convert between your application levels and the 398# logging system. 399# 400SILENT = 120 401TACITURN = 119 402TERSE = 118 403EFFUSIVE = 117 404SOCIABLE = 116 405VERBOSE = 115 406TALKATIVE = 114 407GARRULOUS = 113 408CHATTERBOX = 112 409BORING = 111 410 411LEVEL_RANGE = range(BORING, SILENT + 1) 412 413# 414# Next, we define names for our levels. You don't need to do this - in which 415# case the system will use "Level n" to denote the text for the level. 416# 417my_logging_levels = { 418 SILENT : 'Silent', 419 TACITURN : 'Taciturn', 420 TERSE : 'Terse', 421 EFFUSIVE : 'Effusive', 422 SOCIABLE : 'Sociable', 423 VERBOSE : 'Verbose', 424 TALKATIVE : 'Talkative', 425 GARRULOUS : 'Garrulous', 426 CHATTERBOX : 'Chatterbox', 427 BORING : 'Boring', 428} 429 430class GarrulousFilter(logging.Filter): 431 432 """A filter which blocks garrulous messages.""" 433 434 def filter(self, record): 435 return record.levelno != GARRULOUS 436 437class VerySpecificFilter(logging.Filter): 438 439 """A filter which blocks sociable and taciturn messages.""" 440 441 def filter(self, record): 442 return record.levelno not in [SOCIABLE, TACITURN] 443 444 445class CustomLevelsAndFiltersTest(BaseTest): 446 447 """Test various filtering possibilities with custom logging levels.""" 448 449 # Skip the logger name group. 450 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 451 452 def setUp(self): 453 BaseTest.setUp(self) 454 for k, v in my_logging_levels.items(): 455 logging.addLevelName(k, v) 456 457 def log_at_all_levels(self, logger): 458 for lvl in LEVEL_RANGE: 459 logger.log(lvl, self.next_message()) 460 461 def test_logger_filter(self): 462 # Filter at logger level. 463 self.root_logger.setLevel(VERBOSE) 464 # Levels >= 'Verbose' are good. 465 self.log_at_all_levels(self.root_logger) 466 self.assert_log_lines([ 467 ('Verbose', '5'), 468 ('Sociable', '6'), 469 ('Effusive', '7'), 470 ('Terse', '8'), 471 ('Taciturn', '9'), 472 ('Silent', '10'), 473 ]) 474 475 def test_handler_filter(self): 476 # Filter at handler level. 477 self.root_logger.handlers[0].setLevel(SOCIABLE) 478 try: 479 # Levels >= 'Sociable' are good. 480 self.log_at_all_levels(self.root_logger) 481 self.assert_log_lines([ 482 ('Sociable', '6'), 483 ('Effusive', '7'), 484 ('Terse', '8'), 485 ('Taciturn', '9'), 486 ('Silent', '10'), 487 ]) 488 finally: 489 self.root_logger.handlers[0].setLevel(logging.NOTSET) 490 491 def test_specific_filters(self): 492 # Set a specific filter object on the handler, and then add another 493 # filter object on the logger itself. 494 handler = self.root_logger.handlers[0] 495 specific_filter = None 496 garr = GarrulousFilter() 497 handler.addFilter(garr) 498 try: 499 self.log_at_all_levels(self.root_logger) 500 first_lines = [ 501 # Notice how 'Garrulous' is missing 502 ('Boring', '1'), 503 ('Chatterbox', '2'), 504 ('Talkative', '4'), 505 ('Verbose', '5'), 506 ('Sociable', '6'), 507 ('Effusive', '7'), 508 ('Terse', '8'), 509 ('Taciturn', '9'), 510 ('Silent', '10'), 511 ] 512 self.assert_log_lines(first_lines) 513 514 specific_filter = VerySpecificFilter() 515 self.root_logger.addFilter(specific_filter) 516 self.log_at_all_levels(self.root_logger) 517 self.assert_log_lines(first_lines + [ 518 # Not only 'Garrulous' is still missing, but also 'Sociable' 519 # and 'Taciturn' 520 ('Boring', '11'), 521 ('Chatterbox', '12'), 522 ('Talkative', '14'), 523 ('Verbose', '15'), 524 ('Effusive', '17'), 525 ('Terse', '18'), 526 ('Silent', '20'), 527 ]) 528 finally: 529 if specific_filter: 530 self.root_logger.removeFilter(specific_filter) 531 handler.removeFilter(garr) 532 533 534class HandlerTest(BaseTest): 535 def test_name(self): 536 h = logging.Handler() 537 h.name = 'generic' 538 self.assertEqual(h.name, 'generic') 539 h.name = 'anothergeneric' 540 self.assertEqual(h.name, 'anothergeneric') 541 self.assertRaises(NotImplementedError, h.emit, None) 542 543 def test_builtin_handlers(self): 544 # We can't actually *use* too many handlers in the tests, 545 # but we can try instantiating them with various options 546 if sys.platform in ('linux', 'darwin'): 547 for existing in (True, False): 548 fd, fn = tempfile.mkstemp() 549 os.close(fd) 550 if not existing: 551 os.unlink(fn) 552 h = logging.handlers.WatchedFileHandler(fn, delay=True) 553 if existing: 554 dev, ino = h.dev, h.ino 555 self.assertEqual(dev, -1) 556 self.assertEqual(ino, -1) 557 r = logging.makeLogRecord({'msg': 'Test'}) 558 h.handle(r) 559 # Now remove the file. 560 os.unlink(fn) 561 self.assertFalse(os.path.exists(fn)) 562 # The next call should recreate the file. 563 h.handle(r) 564 self.assertTrue(os.path.exists(fn)) 565 else: 566 self.assertEqual(h.dev, -1) 567 self.assertEqual(h.ino, -1) 568 h.close() 569 if existing: 570 os.unlink(fn) 571 if sys.platform == 'darwin': 572 sockname = '/var/run/syslog' 573 else: 574 sockname = '/dev/log' 575 try: 576 h = logging.handlers.SysLogHandler(sockname) 577 self.assertEqual(h.facility, h.LOG_USER) 578 self.assertTrue(h.unixsocket) 579 h.close() 580 except OSError: # syslogd might not be available 581 pass 582 for method in ('GET', 'POST', 'PUT'): 583 if method == 'PUT': 584 self.assertRaises(ValueError, logging.handlers.HTTPHandler, 585 'localhost', '/log', method) 586 else: 587 h = logging.handlers.HTTPHandler('localhost', '/log', method) 588 h.close() 589 h = logging.handlers.BufferingHandler(0) 590 r = logging.makeLogRecord({}) 591 self.assertTrue(h.shouldFlush(r)) 592 h.close() 593 h = logging.handlers.BufferingHandler(1) 594 self.assertFalse(h.shouldFlush(r)) 595 h.close() 596 597 def test_path_objects(self): 598 """ 599 Test that Path objects are accepted as filename arguments to handlers. 600 601 See Issue #27493. 602 """ 603 fd, fn = tempfile.mkstemp() 604 os.close(fd) 605 os.unlink(fn) 606 pfn = pathlib.Path(fn) 607 cases = ( 608 (logging.FileHandler, (pfn, 'w')), 609 (logging.handlers.RotatingFileHandler, (pfn, 'a')), 610 (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')), 611 ) 612 if sys.platform in ('linux', 'darwin'): 613 cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),) 614 for cls, args in cases: 615 h = cls(*args) 616 self.assertTrue(os.path.exists(fn)) 617 h.close() 618 os.unlink(fn) 619 620 @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') 621 def test_race(self): 622 # Issue #14632 refers. 623 def remove_loop(fname, tries): 624 for _ in range(tries): 625 try: 626 os.unlink(fname) 627 self.deletion_time = time.time() 628 except OSError: 629 pass 630 time.sleep(0.004 * random.randint(0, 4)) 631 632 del_count = 500 633 log_count = 500 634 635 self.handle_time = None 636 self.deletion_time = None 637 638 for delay in (False, True): 639 fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') 640 os.close(fd) 641 remover = threading.Thread(target=remove_loop, args=(fn, del_count)) 642 remover.daemon = True 643 remover.start() 644 h = logging.handlers.WatchedFileHandler(fn, delay=delay) 645 f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') 646 h.setFormatter(f) 647 try: 648 for _ in range(log_count): 649 time.sleep(0.005) 650 r = logging.makeLogRecord({'msg': 'testing' }) 651 try: 652 self.handle_time = time.time() 653 h.handle(r) 654 except Exception: 655 print('Deleted at %s, ' 656 'opened at %s' % (self.deletion_time, 657 self.handle_time)) 658 raise 659 finally: 660 remover.join() 661 h.close() 662 if os.path.exists(fn): 663 os.unlink(fn) 664 665 # The implementation relies on os.register_at_fork existing, but we test 666 # based on os.fork existing because that is what users and this test use. 667 # This helps ensure that when fork exists (the important concept) that the 668 # register_at_fork mechanism is also present and used. 669 @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().') 670 def test_post_fork_child_no_deadlock(self): 671 """Ensure child logging locks are not held; bpo-6721 & bpo-36533.""" 672 class _OurHandler(logging.Handler): 673 def __init__(self): 674 super().__init__() 675 self.sub_handler = logging.StreamHandler( 676 stream=open('/dev/null', 'wt')) 677 678 def emit(self, record): 679 self.sub_handler.acquire() 680 try: 681 self.sub_handler.emit(record) 682 finally: 683 self.sub_handler.release() 684 685 self.assertEqual(len(logging._handlers), 0) 686 refed_h = _OurHandler() 687 self.addCleanup(refed_h.sub_handler.stream.close) 688 refed_h.name = 'because we need at least one for this test' 689 self.assertGreater(len(logging._handlers), 0) 690 self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1) 691 test_logger = logging.getLogger('test_post_fork_child_no_deadlock') 692 test_logger.addHandler(refed_h) 693 test_logger.setLevel(logging.DEBUG) 694 695 locks_held__ready_to_fork = threading.Event() 696 fork_happened__release_locks_and_end_thread = threading.Event() 697 698 def lock_holder_thread_fn(): 699 logging._acquireLock() 700 try: 701 refed_h.acquire() 702 try: 703 # Tell the main thread to do the fork. 704 locks_held__ready_to_fork.set() 705 706 # If the deadlock bug exists, the fork will happen 707 # without dealing with the locks we hold, deadlocking 708 # the child. 709 710 # Wait for a successful fork or an unreasonable amount of 711 # time before releasing our locks. To avoid a timing based 712 # test we'd need communication from os.fork() as to when it 713 # has actually happened. Given this is a regression test 714 # for a fixed issue, potentially less reliably detecting 715 # regression via timing is acceptable for simplicity. 716 # The test will always take at least this long. :( 717 fork_happened__release_locks_and_end_thread.wait(0.5) 718 finally: 719 refed_h.release() 720 finally: 721 logging._releaseLock() 722 723 lock_holder_thread = threading.Thread( 724 target=lock_holder_thread_fn, 725 name='test_post_fork_child_no_deadlock lock holder') 726 lock_holder_thread.start() 727 728 locks_held__ready_to_fork.wait() 729 pid = os.fork() 730 if pid == 0: # Child. 731 try: 732 test_logger.info(r'Child process did not deadlock. \o/') 733 finally: 734 os._exit(0) 735 else: # Parent. 736 test_logger.info(r'Parent process returned from fork. \o/') 737 fork_happened__release_locks_and_end_thread.set() 738 lock_holder_thread.join() 739 start_time = time.monotonic() 740 while True: 741 test_logger.debug('Waiting for child process.') 742 waited_pid, status = os.waitpid(pid, os.WNOHANG) 743 if waited_pid == pid: 744 break # child process exited. 745 if time.monotonic() - start_time > 7: 746 break # so long? implies child deadlock. 747 time.sleep(0.05) 748 test_logger.debug('Done waiting.') 749 if waited_pid != pid: 750 os.kill(pid, signal.SIGKILL) 751 waited_pid, status = os.waitpid(pid, 0) 752 self.fail("child process deadlocked.") 753 self.assertEqual(status, 0, msg="child process error") 754 755 756class BadStream(object): 757 def write(self, data): 758 raise RuntimeError('deliberate mistake') 759 760class TestStreamHandler(logging.StreamHandler): 761 def handleError(self, record): 762 self.error_record = record 763 764class StreamWithIntName(object): 765 level = logging.NOTSET 766 name = 2 767 768class StreamHandlerTest(BaseTest): 769 def test_error_handling(self): 770 h = TestStreamHandler(BadStream()) 771 r = logging.makeLogRecord({}) 772 old_raise = logging.raiseExceptions 773 774 try: 775 h.handle(r) 776 self.assertIs(h.error_record, r) 777 778 h = logging.StreamHandler(BadStream()) 779 with support.captured_stderr() as stderr: 780 h.handle(r) 781 msg = '\nRuntimeError: deliberate mistake\n' 782 self.assertIn(msg, stderr.getvalue()) 783 784 logging.raiseExceptions = False 785 with support.captured_stderr() as stderr: 786 h.handle(r) 787 self.assertEqual('', stderr.getvalue()) 788 finally: 789 logging.raiseExceptions = old_raise 790 791 def test_stream_setting(self): 792 """ 793 Test setting the handler's stream 794 """ 795 h = logging.StreamHandler() 796 stream = io.StringIO() 797 old = h.setStream(stream) 798 self.assertIs(old, sys.stderr) 799 actual = h.setStream(old) 800 self.assertIs(actual, stream) 801 # test that setting to existing value returns None 802 actual = h.setStream(old) 803 self.assertIsNone(actual) 804 805 def test_can_represent_stream_with_int_name(self): 806 h = logging.StreamHandler(StreamWithIntName()) 807 self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>') 808 809# -- The following section could be moved into a server_helper.py module 810# -- if it proves to be of wider utility than just test_logging 811 812class TestSMTPServer(smtpd.SMTPServer): 813 """ 814 This class implements a test SMTP server. 815 816 :param addr: A (host, port) tuple which the server listens on. 817 You can specify a port value of zero: the server's 818 *port* attribute will hold the actual port number 819 used, which can be used in client connections. 820 :param handler: A callable which will be called to process 821 incoming messages. The handler will be passed 822 the client address tuple, who the message is from, 823 a list of recipients and the message data. 824 :param poll_interval: The interval, in seconds, used in the underlying 825 :func:`select` or :func:`poll` call by 826 :func:`asyncore.loop`. 827 :param sockmap: A dictionary which will be used to hold 828 :class:`asyncore.dispatcher` instances used by 829 :func:`asyncore.loop`. This avoids changing the 830 :mod:`asyncore` module's global state. 831 """ 832 833 def __init__(self, addr, handler, poll_interval, sockmap): 834 smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, 835 decode_data=True) 836 self.port = self.socket.getsockname()[1] 837 self._handler = handler 838 self._thread = None 839 self.poll_interval = poll_interval 840 841 def process_message(self, peer, mailfrom, rcpttos, data): 842 """ 843 Delegates to the handler passed in to the server's constructor. 844 845 Typically, this will be a test case method. 846 :param peer: The client (host, port) tuple. 847 :param mailfrom: The address of the sender. 848 :param rcpttos: The addresses of the recipients. 849 :param data: The message. 850 """ 851 self._handler(peer, mailfrom, rcpttos, data) 852 853 def start(self): 854 """ 855 Start the server running on a separate daemon thread. 856 """ 857 self._thread = t = threading.Thread(target=self.serve_forever, 858 args=(self.poll_interval,)) 859 t.setDaemon(True) 860 t.start() 861 862 def serve_forever(self, poll_interval): 863 """ 864 Run the :mod:`asyncore` loop until normal termination 865 conditions arise. 866 :param poll_interval: The interval, in seconds, used in the underlying 867 :func:`select` or :func:`poll` call by 868 :func:`asyncore.loop`. 869 """ 870 asyncore.loop(poll_interval, map=self._map) 871 872 def stop(self, timeout=None): 873 """ 874 Stop the thread by closing the server instance. 875 Wait for the server thread to terminate. 876 877 :param timeout: How long to wait for the server thread 878 to terminate. 879 """ 880 self.close() 881 support.join_thread(self._thread, timeout) 882 self._thread = None 883 asyncore.close_all(map=self._map, ignore_all=True) 884 885 886class ControlMixin(object): 887 """ 888 This mixin is used to start a server on a separate thread, and 889 shut it down programmatically. Request handling is simplified - instead 890 of needing to derive a suitable RequestHandler subclass, you just 891 provide a callable which will be passed each received request to be 892 processed. 893 894 :param handler: A handler callable which will be called with a 895 single parameter - the request - in order to 896 process the request. This handler is called on the 897 server thread, effectively meaning that requests are 898 processed serially. While not quite Web scale ;-), 899 this should be fine for testing applications. 900 :param poll_interval: The polling interval in seconds. 901 """ 902 def __init__(self, handler, poll_interval): 903 self._thread = None 904 self.poll_interval = poll_interval 905 self._handler = handler 906 self.ready = threading.Event() 907 908 def start(self): 909 """ 910 Create a daemon thread to run the server, and start it. 911 """ 912 self._thread = t = threading.Thread(target=self.serve_forever, 913 args=(self.poll_interval,)) 914 t.setDaemon(True) 915 t.start() 916 917 def serve_forever(self, poll_interval): 918 """ 919 Run the server. Set the ready flag before entering the 920 service loop. 921 """ 922 self.ready.set() 923 super(ControlMixin, self).serve_forever(poll_interval) 924 925 def stop(self, timeout=None): 926 """ 927 Tell the server thread to stop, and wait for it to do so. 928 929 :param timeout: How long to wait for the server thread 930 to terminate. 931 """ 932 self.shutdown() 933 if self._thread is not None: 934 support.join_thread(self._thread, timeout) 935 self._thread = None 936 self.server_close() 937 self.ready.clear() 938 939class TestHTTPServer(ControlMixin, HTTPServer): 940 """ 941 An HTTP server which is controllable using :class:`ControlMixin`. 942 943 :param addr: A tuple with the IP address and port to listen on. 944 :param handler: A handler callable which will be called with a 945 single parameter - the request - in order to 946 process the request. 947 :param poll_interval: The polling interval in seconds. 948 :param log: Pass ``True`` to enable log messages. 949 """ 950 def __init__(self, addr, handler, poll_interval=0.5, 951 log=False, sslctx=None): 952 class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): 953 def __getattr__(self, name, default=None): 954 if name.startswith('do_'): 955 return self.process_request 956 raise AttributeError(name) 957 958 def process_request(self): 959 self.server._handler(self) 960 961 def log_message(self, format, *args): 962 if log: 963 super(DelegatingHTTPRequestHandler, 964 self).log_message(format, *args) 965 HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) 966 ControlMixin.__init__(self, handler, poll_interval) 967 self.sslctx = sslctx 968 969 def get_request(self): 970 try: 971 sock, addr = self.socket.accept() 972 if self.sslctx: 973 sock = self.sslctx.wrap_socket(sock, server_side=True) 974 except OSError as e: 975 # socket errors are silenced by the caller, print them here 976 sys.stderr.write("Got an error:\n%s\n" % e) 977 raise 978 return sock, addr 979 980class TestTCPServer(ControlMixin, ThreadingTCPServer): 981 """ 982 A TCP server which is controllable using :class:`ControlMixin`. 983 984 :param addr: A tuple with the IP address and port to listen on. 985 :param handler: A handler callable which will be called with a single 986 parameter - the request - in order to process the request. 987 :param poll_interval: The polling interval in seconds. 988 :bind_and_activate: If True (the default), binds the server and starts it 989 listening. If False, you need to call 990 :meth:`server_bind` and :meth:`server_activate` at 991 some later time before calling :meth:`start`, so that 992 the server will set up the socket and listen on it. 993 """ 994 995 allow_reuse_address = True 996 997 def __init__(self, addr, handler, poll_interval=0.5, 998 bind_and_activate=True): 999 class DelegatingTCPRequestHandler(StreamRequestHandler): 1000 1001 def handle(self): 1002 self.server._handler(self) 1003 ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, 1004 bind_and_activate) 1005 ControlMixin.__init__(self, handler, poll_interval) 1006 1007 def server_bind(self): 1008 super(TestTCPServer, self).server_bind() 1009 self.port = self.socket.getsockname()[1] 1010 1011class TestUDPServer(ControlMixin, ThreadingUDPServer): 1012 """ 1013 A UDP server which is controllable using :class:`ControlMixin`. 1014 1015 :param addr: A tuple with the IP address and port to listen on. 1016 :param handler: A handler callable which will be called with a 1017 single parameter - the request - in order to 1018 process the request. 1019 :param poll_interval: The polling interval for shutdown requests, 1020 in seconds. 1021 :bind_and_activate: If True (the default), binds the server and 1022 starts it listening. If False, you need to 1023 call :meth:`server_bind` and 1024 :meth:`server_activate` at some later time 1025 before calling :meth:`start`, so that the server will 1026 set up the socket and listen on it. 1027 """ 1028 def __init__(self, addr, handler, poll_interval=0.5, 1029 bind_and_activate=True): 1030 class DelegatingUDPRequestHandler(DatagramRequestHandler): 1031 1032 def handle(self): 1033 self.server._handler(self) 1034 1035 def finish(self): 1036 data = self.wfile.getvalue() 1037 if data: 1038 try: 1039 super(DelegatingUDPRequestHandler, self).finish() 1040 except OSError: 1041 if not self.server._closed: 1042 raise 1043 1044 ThreadingUDPServer.__init__(self, addr, 1045 DelegatingUDPRequestHandler, 1046 bind_and_activate) 1047 ControlMixin.__init__(self, handler, poll_interval) 1048 self._closed = False 1049 1050 def server_bind(self): 1051 super(TestUDPServer, self).server_bind() 1052 self.port = self.socket.getsockname()[1] 1053 1054 def server_close(self): 1055 super(TestUDPServer, self).server_close() 1056 self._closed = True 1057 1058if hasattr(socket, "AF_UNIX"): 1059 class TestUnixStreamServer(TestTCPServer): 1060 address_family = socket.AF_UNIX 1061 1062 class TestUnixDatagramServer(TestUDPServer): 1063 address_family = socket.AF_UNIX 1064 1065# - end of server_helper section 1066 1067class SMTPHandlerTest(BaseTest): 1068 # bpo-14314, bpo-19665, bpo-34092: don't wait forever, timeout of 1 minute 1069 TIMEOUT = 60.0 1070 1071 def test_basic(self): 1072 sockmap = {} 1073 server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001, 1074 sockmap) 1075 server.start() 1076 addr = (support.HOST, server.port) 1077 h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', 1078 timeout=self.TIMEOUT) 1079 self.assertEqual(h.toaddrs, ['you']) 1080 self.messages = [] 1081 r = logging.makeLogRecord({'msg': 'Hello \u2713'}) 1082 self.handled = threading.Event() 1083 h.handle(r) 1084 self.handled.wait(self.TIMEOUT) 1085 server.stop() 1086 self.assertTrue(self.handled.is_set()) 1087 self.assertEqual(len(self.messages), 1) 1088 peer, mailfrom, rcpttos, data = self.messages[0] 1089 self.assertEqual(mailfrom, 'me') 1090 self.assertEqual(rcpttos, ['you']) 1091 self.assertIn('\nSubject: Log\n', data) 1092 self.assertTrue(data.endswith('\n\nHello \u2713')) 1093 h.close() 1094 1095 def process_message(self, *args): 1096 self.messages.append(args) 1097 self.handled.set() 1098 1099class MemoryHandlerTest(BaseTest): 1100 1101 """Tests for the MemoryHandler.""" 1102 1103 # Do not bother with a logger name group. 1104 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 1105 1106 def setUp(self): 1107 BaseTest.setUp(self) 1108 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1109 self.root_hdlr) 1110 self.mem_logger = logging.getLogger('mem') 1111 self.mem_logger.propagate = 0 1112 self.mem_logger.addHandler(self.mem_hdlr) 1113 1114 def tearDown(self): 1115 self.mem_hdlr.close() 1116 BaseTest.tearDown(self) 1117 1118 def test_flush(self): 1119 # The memory handler flushes to its target handler based on specific 1120 # criteria (message count and message level). 1121 self.mem_logger.debug(self.next_message()) 1122 self.assert_log_lines([]) 1123 self.mem_logger.info(self.next_message()) 1124 self.assert_log_lines([]) 1125 # This will flush because the level is >= logging.WARNING 1126 self.mem_logger.warning(self.next_message()) 1127 lines = [ 1128 ('DEBUG', '1'), 1129 ('INFO', '2'), 1130 ('WARNING', '3'), 1131 ] 1132 self.assert_log_lines(lines) 1133 for n in (4, 14): 1134 for i in range(9): 1135 self.mem_logger.debug(self.next_message()) 1136 self.assert_log_lines(lines) 1137 # This will flush because it's the 10th message since the last 1138 # flush. 1139 self.mem_logger.debug(self.next_message()) 1140 lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] 1141 self.assert_log_lines(lines) 1142 1143 self.mem_logger.debug(self.next_message()) 1144 self.assert_log_lines(lines) 1145 1146 def test_flush_on_close(self): 1147 """ 1148 Test that the flush-on-close configuration works as expected. 1149 """ 1150 self.mem_logger.debug(self.next_message()) 1151 self.assert_log_lines([]) 1152 self.mem_logger.info(self.next_message()) 1153 self.assert_log_lines([]) 1154 self.mem_logger.removeHandler(self.mem_hdlr) 1155 # Default behaviour is to flush on close. Check that it happens. 1156 self.mem_hdlr.close() 1157 lines = [ 1158 ('DEBUG', '1'), 1159 ('INFO', '2'), 1160 ] 1161 self.assert_log_lines(lines) 1162 # Now configure for flushing not to be done on close. 1163 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1164 self.root_hdlr, 1165 False) 1166 self.mem_logger.addHandler(self.mem_hdlr) 1167 self.mem_logger.debug(self.next_message()) 1168 self.assert_log_lines(lines) # no change 1169 self.mem_logger.info(self.next_message()) 1170 self.assert_log_lines(lines) # no change 1171 self.mem_logger.removeHandler(self.mem_hdlr) 1172 self.mem_hdlr.close() 1173 # assert that no new lines have been added 1174 self.assert_log_lines(lines) # no change 1175 1176 1177class ExceptionFormatter(logging.Formatter): 1178 """A special exception formatter.""" 1179 def formatException(self, ei): 1180 return "Got a [%s]" % ei[0].__name__ 1181 1182 1183class ConfigFileTest(BaseTest): 1184 1185 """Reading logging config from a .ini-style config file.""" 1186 1187 check_no_resource_warning = support.check_no_resource_warning 1188 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 1189 1190 # config0 is a standard configuration. 1191 config0 = """ 1192 [loggers] 1193 keys=root 1194 1195 [handlers] 1196 keys=hand1 1197 1198 [formatters] 1199 keys=form1 1200 1201 [logger_root] 1202 level=WARNING 1203 handlers=hand1 1204 1205 [handler_hand1] 1206 class=StreamHandler 1207 level=NOTSET 1208 formatter=form1 1209 args=(sys.stdout,) 1210 1211 [formatter_form1] 1212 format=%(levelname)s ++ %(message)s 1213 datefmt= 1214 """ 1215 1216 # config1 adds a little to the standard configuration. 1217 config1 = """ 1218 [loggers] 1219 keys=root,parser 1220 1221 [handlers] 1222 keys=hand1 1223 1224 [formatters] 1225 keys=form1 1226 1227 [logger_root] 1228 level=WARNING 1229 handlers= 1230 1231 [logger_parser] 1232 level=DEBUG 1233 handlers=hand1 1234 propagate=1 1235 qualname=compiler.parser 1236 1237 [handler_hand1] 1238 class=StreamHandler 1239 level=NOTSET 1240 formatter=form1 1241 args=(sys.stdout,) 1242 1243 [formatter_form1] 1244 format=%(levelname)s ++ %(message)s 1245 datefmt= 1246 """ 1247 1248 # config1a moves the handler to the root. 1249 config1a = """ 1250 [loggers] 1251 keys=root,parser 1252 1253 [handlers] 1254 keys=hand1 1255 1256 [formatters] 1257 keys=form1 1258 1259 [logger_root] 1260 level=WARNING 1261 handlers=hand1 1262 1263 [logger_parser] 1264 level=DEBUG 1265 handlers= 1266 propagate=1 1267 qualname=compiler.parser 1268 1269 [handler_hand1] 1270 class=StreamHandler 1271 level=NOTSET 1272 formatter=form1 1273 args=(sys.stdout,) 1274 1275 [formatter_form1] 1276 format=%(levelname)s ++ %(message)s 1277 datefmt= 1278 """ 1279 1280 # config2 has a subtle configuration error that should be reported 1281 config2 = config1.replace("sys.stdout", "sys.stbout") 1282 1283 # config3 has a less subtle configuration error 1284 config3 = config1.replace("formatter=form1", "formatter=misspelled_name") 1285 1286 # config4 specifies a custom formatter class to be loaded 1287 config4 = """ 1288 [loggers] 1289 keys=root 1290 1291 [handlers] 1292 keys=hand1 1293 1294 [formatters] 1295 keys=form1 1296 1297 [logger_root] 1298 level=NOTSET 1299 handlers=hand1 1300 1301 [handler_hand1] 1302 class=StreamHandler 1303 level=NOTSET 1304 formatter=form1 1305 args=(sys.stdout,) 1306 1307 [formatter_form1] 1308 class=""" + __name__ + """.ExceptionFormatter 1309 format=%(levelname)s:%(name)s:%(message)s 1310 datefmt= 1311 """ 1312 1313 # config5 specifies a custom handler class to be loaded 1314 config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') 1315 1316 # config6 uses ', ' delimiters in the handlers and formatters sections 1317 config6 = """ 1318 [loggers] 1319 keys=root,parser 1320 1321 [handlers] 1322 keys=hand1, hand2 1323 1324 [formatters] 1325 keys=form1, form2 1326 1327 [logger_root] 1328 level=WARNING 1329 handlers= 1330 1331 [logger_parser] 1332 level=DEBUG 1333 handlers=hand1 1334 propagate=1 1335 qualname=compiler.parser 1336 1337 [handler_hand1] 1338 class=StreamHandler 1339 level=NOTSET 1340 formatter=form1 1341 args=(sys.stdout,) 1342 1343 [handler_hand2] 1344 class=StreamHandler 1345 level=NOTSET 1346 formatter=form1 1347 args=(sys.stderr,) 1348 1349 [formatter_form1] 1350 format=%(levelname)s ++ %(message)s 1351 datefmt= 1352 1353 [formatter_form2] 1354 format=%(message)s 1355 datefmt= 1356 """ 1357 1358 # config7 adds a compiler logger, and uses kwargs instead of args. 1359 config7 = """ 1360 [loggers] 1361 keys=root,parser,compiler 1362 1363 [handlers] 1364 keys=hand1 1365 1366 [formatters] 1367 keys=form1 1368 1369 [logger_root] 1370 level=WARNING 1371 handlers=hand1 1372 1373 [logger_compiler] 1374 level=DEBUG 1375 handlers= 1376 propagate=1 1377 qualname=compiler 1378 1379 [logger_parser] 1380 level=DEBUG 1381 handlers= 1382 propagate=1 1383 qualname=compiler.parser 1384 1385 [handler_hand1] 1386 class=StreamHandler 1387 level=NOTSET 1388 formatter=form1 1389 kwargs={'stream': sys.stdout,} 1390 1391 [formatter_form1] 1392 format=%(levelname)s ++ %(message)s 1393 datefmt= 1394 """ 1395 1396 # config 8, check for resource warning 1397 config8 = r""" 1398 [loggers] 1399 keys=root 1400 1401 [handlers] 1402 keys=file 1403 1404 [formatters] 1405 keys= 1406 1407 [logger_root] 1408 level=DEBUG 1409 handlers=file 1410 1411 [handler_file] 1412 class=FileHandler 1413 level=DEBUG 1414 args=("{tempfile}",) 1415 """ 1416 1417 disable_test = """ 1418 [loggers] 1419 keys=root 1420 1421 [handlers] 1422 keys=screen 1423 1424 [formatters] 1425 keys= 1426 1427 [logger_root] 1428 level=DEBUG 1429 handlers=screen 1430 1431 [handler_screen] 1432 level=DEBUG 1433 class=StreamHandler 1434 args=(sys.stdout,) 1435 formatter= 1436 """ 1437 1438 def apply_config(self, conf, **kwargs): 1439 file = io.StringIO(textwrap.dedent(conf)) 1440 logging.config.fileConfig(file, **kwargs) 1441 1442 def test_config0_ok(self): 1443 # A simple config file which overrides the default settings. 1444 with support.captured_stdout() as output: 1445 self.apply_config(self.config0) 1446 logger = logging.getLogger() 1447 # Won't output anything 1448 logger.info(self.next_message()) 1449 # Outputs a message 1450 logger.error(self.next_message()) 1451 self.assert_log_lines([ 1452 ('ERROR', '2'), 1453 ], stream=output) 1454 # Original logger output is empty. 1455 self.assert_log_lines([]) 1456 1457 def test_config0_using_cp_ok(self): 1458 # A simple config file which overrides the default settings. 1459 with support.captured_stdout() as output: 1460 file = io.StringIO(textwrap.dedent(self.config0)) 1461 cp = configparser.ConfigParser() 1462 cp.read_file(file) 1463 logging.config.fileConfig(cp) 1464 logger = logging.getLogger() 1465 # Won't output anything 1466 logger.info(self.next_message()) 1467 # Outputs a message 1468 logger.error(self.next_message()) 1469 self.assert_log_lines([ 1470 ('ERROR', '2'), 1471 ], stream=output) 1472 # Original logger output is empty. 1473 self.assert_log_lines([]) 1474 1475 def test_config1_ok(self, config=config1): 1476 # A config file defining a sub-parser as well. 1477 with support.captured_stdout() as output: 1478 self.apply_config(config) 1479 logger = logging.getLogger("compiler.parser") 1480 # Both will output a message 1481 logger.info(self.next_message()) 1482 logger.error(self.next_message()) 1483 self.assert_log_lines([ 1484 ('INFO', '1'), 1485 ('ERROR', '2'), 1486 ], stream=output) 1487 # Original logger output is empty. 1488 self.assert_log_lines([]) 1489 1490 def test_config2_failure(self): 1491 # A simple config file which overrides the default settings. 1492 self.assertRaises(Exception, self.apply_config, self.config2) 1493 1494 def test_config3_failure(self): 1495 # A simple config file which overrides the default settings. 1496 self.assertRaises(Exception, self.apply_config, self.config3) 1497 1498 def test_config4_ok(self): 1499 # A config file specifying a custom formatter class. 1500 with support.captured_stdout() as output: 1501 self.apply_config(self.config4) 1502 logger = logging.getLogger() 1503 try: 1504 raise RuntimeError() 1505 except RuntimeError: 1506 logging.exception("just testing") 1507 sys.stdout.seek(0) 1508 self.assertEqual(output.getvalue(), 1509 "ERROR:root:just testing\nGot a [RuntimeError]\n") 1510 # Original logger output is empty 1511 self.assert_log_lines([]) 1512 1513 def test_config5_ok(self): 1514 self.test_config1_ok(config=self.config5) 1515 1516 def test_config6_ok(self): 1517 self.test_config1_ok(config=self.config6) 1518 1519 def test_config7_ok(self): 1520 with support.captured_stdout() as output: 1521 self.apply_config(self.config1a) 1522 logger = logging.getLogger("compiler.parser") 1523 # See issue #11424. compiler-hyphenated sorts 1524 # between compiler and compiler.xyz and this 1525 # was preventing compiler.xyz from being included 1526 # in the child loggers of compiler because of an 1527 # overzealous loop termination condition. 1528 hyphenated = logging.getLogger('compiler-hyphenated') 1529 # All will output a message 1530 logger.info(self.next_message()) 1531 logger.error(self.next_message()) 1532 hyphenated.critical(self.next_message()) 1533 self.assert_log_lines([ 1534 ('INFO', '1'), 1535 ('ERROR', '2'), 1536 ('CRITICAL', '3'), 1537 ], stream=output) 1538 # Original logger output is empty. 1539 self.assert_log_lines([]) 1540 with support.captured_stdout() as output: 1541 self.apply_config(self.config7) 1542 logger = logging.getLogger("compiler.parser") 1543 self.assertFalse(logger.disabled) 1544 # Both will output a message 1545 logger.info(self.next_message()) 1546 logger.error(self.next_message()) 1547 logger = logging.getLogger("compiler.lexer") 1548 # Both will output a message 1549 logger.info(self.next_message()) 1550 logger.error(self.next_message()) 1551 # Will not appear 1552 hyphenated.critical(self.next_message()) 1553 self.assert_log_lines([ 1554 ('INFO', '4'), 1555 ('ERROR', '5'), 1556 ('INFO', '6'), 1557 ('ERROR', '7'), 1558 ], stream=output) 1559 # Original logger output is empty. 1560 self.assert_log_lines([]) 1561 1562 def test_config8_ok(self): 1563 1564 def cleanup(h1, fn): 1565 h1.close() 1566 os.remove(fn) 1567 1568 with self.check_no_resource_warning(): 1569 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 1570 os.close(fd) 1571 1572 # Replace single backslash with double backslash in windows 1573 # to avoid unicode error during string formatting 1574 if os.name == "nt": 1575 fn = fn.replace("\\", "\\\\") 1576 1577 config8 = self.config8.format(tempfile=fn) 1578 1579 self.apply_config(config8) 1580 self.apply_config(config8) 1581 1582 handler = logging.root.handlers[0] 1583 self.addCleanup(cleanup, handler, fn) 1584 1585 def test_logger_disabling(self): 1586 self.apply_config(self.disable_test) 1587 logger = logging.getLogger('some_pristine_logger') 1588 self.assertFalse(logger.disabled) 1589 self.apply_config(self.disable_test) 1590 self.assertTrue(logger.disabled) 1591 self.apply_config(self.disable_test, disable_existing_loggers=False) 1592 self.assertFalse(logger.disabled) 1593 1594 def test_defaults_do_no_interpolation(self): 1595 """bpo-33802 defaults should not get interpolated""" 1596 ini = textwrap.dedent(""" 1597 [formatters] 1598 keys=default 1599 1600 [formatter_default] 1601 1602 [handlers] 1603 keys=console 1604 1605 [handler_console] 1606 class=logging.StreamHandler 1607 args=tuple() 1608 1609 [loggers] 1610 keys=root 1611 1612 [logger_root] 1613 formatter=default 1614 handlers=console 1615 """).strip() 1616 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini') 1617 try: 1618 os.write(fd, ini.encode('ascii')) 1619 os.close(fd) 1620 logging.config.fileConfig( 1621 fn, 1622 defaults=dict( 1623 version=1, 1624 disable_existing_loggers=False, 1625 formatters={ 1626 "generic": { 1627 "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s", 1628 "datefmt": "[%Y-%m-%d %H:%M:%S %z]", 1629 "class": "logging.Formatter" 1630 }, 1631 }, 1632 ) 1633 ) 1634 finally: 1635 os.unlink(fn) 1636 1637 1638class SocketHandlerTest(BaseTest): 1639 1640 """Test for SocketHandler objects.""" 1641 1642 server_class = TestTCPServer 1643 address = ('localhost', 0) 1644 1645 def setUp(self): 1646 """Set up a TCP server to receive log messages, and a SocketHandler 1647 pointing to that server's address and port.""" 1648 BaseTest.setUp(self) 1649 # Issue #29177: deal with errors that happen during setup 1650 self.server = self.sock_hdlr = self.server_exception = None 1651 try: 1652 self.server = server = self.server_class(self.address, 1653 self.handle_socket, 0.01) 1654 server.start() 1655 # Uncomment next line to test error recovery in setUp() 1656 # raise OSError('dummy error raised') 1657 except OSError as e: 1658 self.server_exception = e 1659 return 1660 server.ready.wait() 1661 hcls = logging.handlers.SocketHandler 1662 if isinstance(server.server_address, tuple): 1663 self.sock_hdlr = hcls('localhost', server.port) 1664 else: 1665 self.sock_hdlr = hcls(server.server_address, None) 1666 self.log_output = '' 1667 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1668 self.root_logger.addHandler(self.sock_hdlr) 1669 self.handled = threading.Semaphore(0) 1670 1671 def tearDown(self): 1672 """Shutdown the TCP server.""" 1673 try: 1674 if self.sock_hdlr: 1675 self.root_logger.removeHandler(self.sock_hdlr) 1676 self.sock_hdlr.close() 1677 if self.server: 1678 self.server.stop(2.0) 1679 finally: 1680 BaseTest.tearDown(self) 1681 1682 def handle_socket(self, request): 1683 conn = request.connection 1684 while True: 1685 chunk = conn.recv(4) 1686 if len(chunk) < 4: 1687 break 1688 slen = struct.unpack(">L", chunk)[0] 1689 chunk = conn.recv(slen) 1690 while len(chunk) < slen: 1691 chunk = chunk + conn.recv(slen - len(chunk)) 1692 obj = pickle.loads(chunk) 1693 record = logging.makeLogRecord(obj) 1694 self.log_output += record.msg + '\n' 1695 self.handled.release() 1696 1697 def test_output(self): 1698 # The log message sent to the SocketHandler is properly received. 1699 if self.server_exception: 1700 self.skipTest(self.server_exception) 1701 logger = logging.getLogger("tcp") 1702 logger.error("spam") 1703 self.handled.acquire() 1704 logger.debug("eggs") 1705 self.handled.acquire() 1706 self.assertEqual(self.log_output, "spam\neggs\n") 1707 1708 def test_noserver(self): 1709 if self.server_exception: 1710 self.skipTest(self.server_exception) 1711 # Avoid timing-related failures due to SocketHandler's own hard-wired 1712 # one-second timeout on socket.create_connection() (issue #16264). 1713 self.sock_hdlr.retryStart = 2.5 1714 # Kill the server 1715 self.server.stop(2.0) 1716 # The logging call should try to connect, which should fail 1717 try: 1718 raise RuntimeError('Deliberate mistake') 1719 except RuntimeError: 1720 self.root_logger.exception('Never sent') 1721 self.root_logger.error('Never sent, either') 1722 now = time.time() 1723 self.assertGreater(self.sock_hdlr.retryTime, now) 1724 time.sleep(self.sock_hdlr.retryTime - now + 0.001) 1725 self.root_logger.error('Nor this') 1726 1727def _get_temp_domain_socket(): 1728 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock') 1729 os.close(fd) 1730 # just need a name - file can't be present, or we'll get an 1731 # 'address already in use' error. 1732 os.remove(fn) 1733 return fn 1734 1735@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1736class UnixSocketHandlerTest(SocketHandlerTest): 1737 1738 """Test for SocketHandler with unix sockets.""" 1739 1740 if hasattr(socket, "AF_UNIX"): 1741 server_class = TestUnixStreamServer 1742 1743 def setUp(self): 1744 # override the definition in the base class 1745 self.address = _get_temp_domain_socket() 1746 SocketHandlerTest.setUp(self) 1747 1748 def tearDown(self): 1749 SocketHandlerTest.tearDown(self) 1750 support.unlink(self.address) 1751 1752class DatagramHandlerTest(BaseTest): 1753 1754 """Test for DatagramHandler.""" 1755 1756 server_class = TestUDPServer 1757 address = ('localhost', 0) 1758 1759 def setUp(self): 1760 """Set up a UDP server to receive log messages, and a DatagramHandler 1761 pointing to that server's address and port.""" 1762 BaseTest.setUp(self) 1763 # Issue #29177: deal with errors that happen during setup 1764 self.server = self.sock_hdlr = self.server_exception = None 1765 try: 1766 self.server = server = self.server_class(self.address, 1767 self.handle_datagram, 0.01) 1768 server.start() 1769 # Uncomment next line to test error recovery in setUp() 1770 # raise OSError('dummy error raised') 1771 except OSError as e: 1772 self.server_exception = e 1773 return 1774 server.ready.wait() 1775 hcls = logging.handlers.DatagramHandler 1776 if isinstance(server.server_address, tuple): 1777 self.sock_hdlr = hcls('localhost', server.port) 1778 else: 1779 self.sock_hdlr = hcls(server.server_address, None) 1780 self.log_output = '' 1781 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1782 self.root_logger.addHandler(self.sock_hdlr) 1783 self.handled = threading.Event() 1784 1785 def tearDown(self): 1786 """Shutdown the UDP server.""" 1787 try: 1788 if self.server: 1789 self.server.stop(2.0) 1790 if self.sock_hdlr: 1791 self.root_logger.removeHandler(self.sock_hdlr) 1792 self.sock_hdlr.close() 1793 finally: 1794 BaseTest.tearDown(self) 1795 1796 def handle_datagram(self, request): 1797 slen = struct.pack('>L', 0) # length of prefix 1798 packet = request.packet[len(slen):] 1799 obj = pickle.loads(packet) 1800 record = logging.makeLogRecord(obj) 1801 self.log_output += record.msg + '\n' 1802 self.handled.set() 1803 1804 def test_output(self): 1805 # The log message sent to the DatagramHandler is properly received. 1806 if self.server_exception: 1807 self.skipTest(self.server_exception) 1808 logger = logging.getLogger("udp") 1809 logger.error("spam") 1810 self.handled.wait() 1811 self.handled.clear() 1812 logger.error("eggs") 1813 self.handled.wait() 1814 self.assertEqual(self.log_output, "spam\neggs\n") 1815 1816@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1817class UnixDatagramHandlerTest(DatagramHandlerTest): 1818 1819 """Test for DatagramHandler using Unix sockets.""" 1820 1821 if hasattr(socket, "AF_UNIX"): 1822 server_class = TestUnixDatagramServer 1823 1824 def setUp(self): 1825 # override the definition in the base class 1826 self.address = _get_temp_domain_socket() 1827 DatagramHandlerTest.setUp(self) 1828 1829 def tearDown(self): 1830 DatagramHandlerTest.tearDown(self) 1831 support.unlink(self.address) 1832 1833class SysLogHandlerTest(BaseTest): 1834 1835 """Test for SysLogHandler using UDP.""" 1836 1837 server_class = TestUDPServer 1838 address = ('localhost', 0) 1839 1840 def setUp(self): 1841 """Set up a UDP server to receive log messages, and a SysLogHandler 1842 pointing to that server's address and port.""" 1843 BaseTest.setUp(self) 1844 # Issue #29177: deal with errors that happen during setup 1845 self.server = self.sl_hdlr = self.server_exception = None 1846 try: 1847 self.server = server = self.server_class(self.address, 1848 self.handle_datagram, 0.01) 1849 server.start() 1850 # Uncomment next line to test error recovery in setUp() 1851 # raise OSError('dummy error raised') 1852 except OSError as e: 1853 self.server_exception = e 1854 return 1855 server.ready.wait() 1856 hcls = logging.handlers.SysLogHandler 1857 if isinstance(server.server_address, tuple): 1858 self.sl_hdlr = hcls((server.server_address[0], server.port)) 1859 else: 1860 self.sl_hdlr = hcls(server.server_address) 1861 self.log_output = '' 1862 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1863 self.root_logger.addHandler(self.sl_hdlr) 1864 self.handled = threading.Event() 1865 1866 def tearDown(self): 1867 """Shutdown the server.""" 1868 try: 1869 if self.server: 1870 self.server.stop(2.0) 1871 if self.sl_hdlr: 1872 self.root_logger.removeHandler(self.sl_hdlr) 1873 self.sl_hdlr.close() 1874 finally: 1875 BaseTest.tearDown(self) 1876 1877 def handle_datagram(self, request): 1878 self.log_output = request.packet 1879 self.handled.set() 1880 1881 def test_output(self): 1882 if self.server_exception: 1883 self.skipTest(self.server_exception) 1884 # The log message sent to the SysLogHandler is properly received. 1885 logger = logging.getLogger("slh") 1886 logger.error("sp\xe4m") 1887 self.handled.wait() 1888 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 1889 self.handled.clear() 1890 self.sl_hdlr.append_nul = False 1891 logger.error("sp\xe4m") 1892 self.handled.wait() 1893 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m') 1894 self.handled.clear() 1895 self.sl_hdlr.ident = "h\xe4m-" 1896 logger.error("sp\xe4m") 1897 self.handled.wait() 1898 self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') 1899 1900@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1901class UnixSysLogHandlerTest(SysLogHandlerTest): 1902 1903 """Test for SysLogHandler with Unix sockets.""" 1904 1905 if hasattr(socket, "AF_UNIX"): 1906 server_class = TestUnixDatagramServer 1907 1908 def setUp(self): 1909 # override the definition in the base class 1910 self.address = _get_temp_domain_socket() 1911 SysLogHandlerTest.setUp(self) 1912 1913 def tearDown(self): 1914 SysLogHandlerTest.tearDown(self) 1915 support.unlink(self.address) 1916 1917@unittest.skipUnless(support.IPV6_ENABLED, 1918 'IPv6 support required for this test.') 1919class IPv6SysLogHandlerTest(SysLogHandlerTest): 1920 1921 """Test for SysLogHandler with IPv6 host.""" 1922 1923 server_class = TestUDPServer 1924 address = ('::1', 0) 1925 1926 def setUp(self): 1927 self.server_class.address_family = socket.AF_INET6 1928 super(IPv6SysLogHandlerTest, self).setUp() 1929 1930 def tearDown(self): 1931 self.server_class.address_family = socket.AF_INET 1932 super(IPv6SysLogHandlerTest, self).tearDown() 1933 1934class HTTPHandlerTest(BaseTest): 1935 """Test for HTTPHandler.""" 1936 1937 def setUp(self): 1938 """Set up an HTTP server to receive log messages, and a HTTPHandler 1939 pointing to that server's address and port.""" 1940 BaseTest.setUp(self) 1941 self.handled = threading.Event() 1942 1943 def handle_request(self, request): 1944 self.command = request.command 1945 self.log_data = urlparse(request.path) 1946 if self.command == 'POST': 1947 try: 1948 rlen = int(request.headers['Content-Length']) 1949 self.post_data = request.rfile.read(rlen) 1950 except: 1951 self.post_data = None 1952 request.send_response(200) 1953 request.end_headers() 1954 self.handled.set() 1955 1956 def test_output(self): 1957 # The log message sent to the HTTPHandler is properly received. 1958 logger = logging.getLogger("http") 1959 root_logger = self.root_logger 1960 root_logger.removeHandler(self.root_logger.handlers[0]) 1961 for secure in (False, True): 1962 addr = ('localhost', 0) 1963 if secure: 1964 try: 1965 import ssl 1966 except ImportError: 1967 sslctx = None 1968 else: 1969 here = os.path.dirname(__file__) 1970 localhost_cert = os.path.join(here, "keycert.pem") 1971 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 1972 sslctx.load_cert_chain(localhost_cert) 1973 1974 context = ssl.create_default_context(cafile=localhost_cert) 1975 else: 1976 sslctx = None 1977 context = None 1978 self.server = server = TestHTTPServer(addr, self.handle_request, 1979 0.01, sslctx=sslctx) 1980 server.start() 1981 server.ready.wait() 1982 host = 'localhost:%d' % server.server_port 1983 secure_client = secure and sslctx 1984 self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', 1985 secure=secure_client, 1986 context=context, 1987 credentials=('foo', 'bar')) 1988 self.log_data = None 1989 root_logger.addHandler(self.h_hdlr) 1990 1991 for method in ('GET', 'POST'): 1992 self.h_hdlr.method = method 1993 self.handled.clear() 1994 msg = "sp\xe4m" 1995 logger.error(msg) 1996 self.handled.wait() 1997 self.assertEqual(self.log_data.path, '/frob') 1998 self.assertEqual(self.command, method) 1999 if method == 'GET': 2000 d = parse_qs(self.log_data.query) 2001 else: 2002 d = parse_qs(self.post_data.decode('utf-8')) 2003 self.assertEqual(d['name'], ['http']) 2004 self.assertEqual(d['funcName'], ['test_output']) 2005 self.assertEqual(d['msg'], [msg]) 2006 2007 self.server.stop(2.0) 2008 self.root_logger.removeHandler(self.h_hdlr) 2009 self.h_hdlr.close() 2010 2011class MemoryTest(BaseTest): 2012 2013 """Test memory persistence of logger objects.""" 2014 2015 def setUp(self): 2016 """Create a dict to remember potentially destroyed objects.""" 2017 BaseTest.setUp(self) 2018 self._survivors = {} 2019 2020 def _watch_for_survival(self, *args): 2021 """Watch the given objects for survival, by creating weakrefs to 2022 them.""" 2023 for obj in args: 2024 key = id(obj), repr(obj) 2025 self._survivors[key] = weakref.ref(obj) 2026 2027 def _assertTruesurvival(self): 2028 """Assert that all objects watched for survival have survived.""" 2029 # Trigger cycle breaking. 2030 gc.collect() 2031 dead = [] 2032 for (id_, repr_), ref in self._survivors.items(): 2033 if ref() is None: 2034 dead.append(repr_) 2035 if dead: 2036 self.fail("%d objects should have survived " 2037 "but have been destroyed: %s" % (len(dead), ", ".join(dead))) 2038 2039 def test_persistent_loggers(self): 2040 # Logger objects are persistent and retain their configuration, even 2041 # if visible references are destroyed. 2042 self.root_logger.setLevel(logging.INFO) 2043 foo = logging.getLogger("foo") 2044 self._watch_for_survival(foo) 2045 foo.setLevel(logging.DEBUG) 2046 self.root_logger.debug(self.next_message()) 2047 foo.debug(self.next_message()) 2048 self.assert_log_lines([ 2049 ('foo', 'DEBUG', '2'), 2050 ]) 2051 del foo 2052 # foo has survived. 2053 self._assertTruesurvival() 2054 # foo has retained its settings. 2055 bar = logging.getLogger("foo") 2056 bar.debug(self.next_message()) 2057 self.assert_log_lines([ 2058 ('foo', 'DEBUG', '2'), 2059 ('foo', 'DEBUG', '3'), 2060 ]) 2061 2062 2063class EncodingTest(BaseTest): 2064 def test_encoding_plain_file(self): 2065 # In Python 2.x, a plain file object is treated as having no encoding. 2066 log = logging.getLogger("test") 2067 fd, fn = tempfile.mkstemp(".log", "test_logging-1-") 2068 os.close(fd) 2069 # the non-ascii data we write to the log. 2070 data = "foo\x80" 2071 try: 2072 handler = logging.FileHandler(fn, encoding="utf-8") 2073 log.addHandler(handler) 2074 try: 2075 # write non-ascii data to the log. 2076 log.warning(data) 2077 finally: 2078 log.removeHandler(handler) 2079 handler.close() 2080 # check we wrote exactly those bytes, ignoring trailing \n etc 2081 f = open(fn, encoding="utf-8") 2082 try: 2083 self.assertEqual(f.read().rstrip(), data) 2084 finally: 2085 f.close() 2086 finally: 2087 if os.path.isfile(fn): 2088 os.remove(fn) 2089 2090 def test_encoding_cyrillic_unicode(self): 2091 log = logging.getLogger("test") 2092 # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) 2093 message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' 2094 # Ensure it's written in a Cyrillic encoding 2095 writer_class = codecs.getwriter('cp1251') 2096 writer_class.encoding = 'cp1251' 2097 stream = io.BytesIO() 2098 writer = writer_class(stream, 'strict') 2099 handler = logging.StreamHandler(writer) 2100 log.addHandler(handler) 2101 try: 2102 log.warning(message) 2103 finally: 2104 log.removeHandler(handler) 2105 handler.close() 2106 # check we wrote exactly those bytes, ignoring trailing \n etc 2107 s = stream.getvalue() 2108 # Compare against what the data should be when encoded in CP-1251 2109 self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') 2110 2111 2112class WarningsTest(BaseTest): 2113 2114 def test_warnings(self): 2115 with warnings.catch_warnings(): 2116 logging.captureWarnings(True) 2117 self.addCleanup(logging.captureWarnings, False) 2118 warnings.filterwarnings("always", category=UserWarning) 2119 stream = io.StringIO() 2120 h = logging.StreamHandler(stream) 2121 logger = logging.getLogger("py.warnings") 2122 logger.addHandler(h) 2123 warnings.warn("I'm warning you...") 2124 logger.removeHandler(h) 2125 s = stream.getvalue() 2126 h.close() 2127 self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) 2128 2129 # See if an explicit file uses the original implementation 2130 a_file = io.StringIO() 2131 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, 2132 a_file, "Dummy line") 2133 s = a_file.getvalue() 2134 a_file.close() 2135 self.assertEqual(s, 2136 "dummy.py:42: UserWarning: Explicit\n Dummy line\n") 2137 2138 def test_warnings_no_handlers(self): 2139 with warnings.catch_warnings(): 2140 logging.captureWarnings(True) 2141 self.addCleanup(logging.captureWarnings, False) 2142 2143 # confirm our assumption: no loggers are set 2144 logger = logging.getLogger("py.warnings") 2145 self.assertEqual(logger.handlers, []) 2146 2147 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42) 2148 self.assertEqual(len(logger.handlers), 1) 2149 self.assertIsInstance(logger.handlers[0], logging.NullHandler) 2150 2151 2152def formatFunc(format, datefmt=None): 2153 return logging.Formatter(format, datefmt) 2154 2155class myCustomFormatter: 2156 def __init__(self, fmt, datefmt=None): 2157 pass 2158 2159def handlerFunc(): 2160 return logging.StreamHandler() 2161 2162class CustomHandler(logging.StreamHandler): 2163 pass 2164 2165class ConfigDictTest(BaseTest): 2166 2167 """Reading logging config from a dictionary.""" 2168 2169 check_no_resource_warning = support.check_no_resource_warning 2170 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 2171 2172 # config0 is a standard configuration. 2173 config0 = { 2174 'version': 1, 2175 'formatters': { 2176 'form1' : { 2177 'format' : '%(levelname)s ++ %(message)s', 2178 }, 2179 }, 2180 'handlers' : { 2181 'hand1' : { 2182 'class' : 'logging.StreamHandler', 2183 'formatter' : 'form1', 2184 'level' : 'NOTSET', 2185 'stream' : 'ext://sys.stdout', 2186 }, 2187 }, 2188 'root' : { 2189 'level' : 'WARNING', 2190 'handlers' : ['hand1'], 2191 }, 2192 } 2193 2194 # config1 adds a little to the standard configuration. 2195 config1 = { 2196 'version': 1, 2197 'formatters': { 2198 'form1' : { 2199 'format' : '%(levelname)s ++ %(message)s', 2200 }, 2201 }, 2202 'handlers' : { 2203 'hand1' : { 2204 'class' : 'logging.StreamHandler', 2205 'formatter' : 'form1', 2206 'level' : 'NOTSET', 2207 'stream' : 'ext://sys.stdout', 2208 }, 2209 }, 2210 'loggers' : { 2211 'compiler.parser' : { 2212 'level' : 'DEBUG', 2213 'handlers' : ['hand1'], 2214 }, 2215 }, 2216 'root' : { 2217 'level' : 'WARNING', 2218 }, 2219 } 2220 2221 # config1a moves the handler to the root. Used with config8a 2222 config1a = { 2223 'version': 1, 2224 'formatters': { 2225 'form1' : { 2226 'format' : '%(levelname)s ++ %(message)s', 2227 }, 2228 }, 2229 'handlers' : { 2230 'hand1' : { 2231 'class' : 'logging.StreamHandler', 2232 'formatter' : 'form1', 2233 'level' : 'NOTSET', 2234 'stream' : 'ext://sys.stdout', 2235 }, 2236 }, 2237 'loggers' : { 2238 'compiler.parser' : { 2239 'level' : 'DEBUG', 2240 }, 2241 }, 2242 'root' : { 2243 'level' : 'WARNING', 2244 'handlers' : ['hand1'], 2245 }, 2246 } 2247 2248 # config2 has a subtle configuration error that should be reported 2249 config2 = { 2250 'version': 1, 2251 'formatters': { 2252 'form1' : { 2253 'format' : '%(levelname)s ++ %(message)s', 2254 }, 2255 }, 2256 'handlers' : { 2257 'hand1' : { 2258 'class' : 'logging.StreamHandler', 2259 'formatter' : 'form1', 2260 'level' : 'NOTSET', 2261 'stream' : 'ext://sys.stdbout', 2262 }, 2263 }, 2264 'loggers' : { 2265 'compiler.parser' : { 2266 'level' : 'DEBUG', 2267 'handlers' : ['hand1'], 2268 }, 2269 }, 2270 'root' : { 2271 'level' : 'WARNING', 2272 }, 2273 } 2274 2275 # As config1 but with a misspelt level on a handler 2276 config2a = { 2277 'version': 1, 2278 'formatters': { 2279 'form1' : { 2280 'format' : '%(levelname)s ++ %(message)s', 2281 }, 2282 }, 2283 'handlers' : { 2284 'hand1' : { 2285 'class' : 'logging.StreamHandler', 2286 'formatter' : 'form1', 2287 'level' : 'NTOSET', 2288 'stream' : 'ext://sys.stdout', 2289 }, 2290 }, 2291 'loggers' : { 2292 'compiler.parser' : { 2293 'level' : 'DEBUG', 2294 'handlers' : ['hand1'], 2295 }, 2296 }, 2297 'root' : { 2298 'level' : 'WARNING', 2299 }, 2300 } 2301 2302 2303 # As config1 but with a misspelt level on a logger 2304 config2b = { 2305 'version': 1, 2306 'formatters': { 2307 'form1' : { 2308 'format' : '%(levelname)s ++ %(message)s', 2309 }, 2310 }, 2311 'handlers' : { 2312 'hand1' : { 2313 'class' : 'logging.StreamHandler', 2314 'formatter' : 'form1', 2315 'level' : 'NOTSET', 2316 'stream' : 'ext://sys.stdout', 2317 }, 2318 }, 2319 'loggers' : { 2320 'compiler.parser' : { 2321 'level' : 'DEBUG', 2322 'handlers' : ['hand1'], 2323 }, 2324 }, 2325 'root' : { 2326 'level' : 'WRANING', 2327 }, 2328 } 2329 2330 # config3 has a less subtle configuration error 2331 config3 = { 2332 'version': 1, 2333 'formatters': { 2334 'form1' : { 2335 'format' : '%(levelname)s ++ %(message)s', 2336 }, 2337 }, 2338 'handlers' : { 2339 'hand1' : { 2340 'class' : 'logging.StreamHandler', 2341 'formatter' : 'misspelled_name', 2342 'level' : 'NOTSET', 2343 'stream' : 'ext://sys.stdout', 2344 }, 2345 }, 2346 'loggers' : { 2347 'compiler.parser' : { 2348 'level' : 'DEBUG', 2349 'handlers' : ['hand1'], 2350 }, 2351 }, 2352 'root' : { 2353 'level' : 'WARNING', 2354 }, 2355 } 2356 2357 # config4 specifies a custom formatter class to be loaded 2358 config4 = { 2359 'version': 1, 2360 'formatters': { 2361 'form1' : { 2362 '()' : __name__ + '.ExceptionFormatter', 2363 'format' : '%(levelname)s:%(name)s:%(message)s', 2364 }, 2365 }, 2366 'handlers' : { 2367 'hand1' : { 2368 'class' : 'logging.StreamHandler', 2369 'formatter' : 'form1', 2370 'level' : 'NOTSET', 2371 'stream' : 'ext://sys.stdout', 2372 }, 2373 }, 2374 'root' : { 2375 'level' : 'NOTSET', 2376 'handlers' : ['hand1'], 2377 }, 2378 } 2379 2380 # As config4 but using an actual callable rather than a string 2381 config4a = { 2382 'version': 1, 2383 'formatters': { 2384 'form1' : { 2385 '()' : ExceptionFormatter, 2386 'format' : '%(levelname)s:%(name)s:%(message)s', 2387 }, 2388 'form2' : { 2389 '()' : __name__ + '.formatFunc', 2390 'format' : '%(levelname)s:%(name)s:%(message)s', 2391 }, 2392 'form3' : { 2393 '()' : formatFunc, 2394 'format' : '%(levelname)s:%(name)s:%(message)s', 2395 }, 2396 }, 2397 'handlers' : { 2398 'hand1' : { 2399 'class' : 'logging.StreamHandler', 2400 'formatter' : 'form1', 2401 'level' : 'NOTSET', 2402 'stream' : 'ext://sys.stdout', 2403 }, 2404 'hand2' : { 2405 '()' : handlerFunc, 2406 }, 2407 }, 2408 'root' : { 2409 'level' : 'NOTSET', 2410 'handlers' : ['hand1'], 2411 }, 2412 } 2413 2414 # config5 specifies a custom handler class to be loaded 2415 config5 = { 2416 'version': 1, 2417 'formatters': { 2418 'form1' : { 2419 'format' : '%(levelname)s ++ %(message)s', 2420 }, 2421 }, 2422 'handlers' : { 2423 'hand1' : { 2424 'class' : __name__ + '.CustomHandler', 2425 'formatter' : 'form1', 2426 'level' : 'NOTSET', 2427 'stream' : 'ext://sys.stdout', 2428 }, 2429 }, 2430 'loggers' : { 2431 'compiler.parser' : { 2432 'level' : 'DEBUG', 2433 'handlers' : ['hand1'], 2434 }, 2435 }, 2436 'root' : { 2437 'level' : 'WARNING', 2438 }, 2439 } 2440 2441 # config6 specifies a custom handler class to be loaded 2442 # but has bad arguments 2443 config6 = { 2444 'version': 1, 2445 'formatters': { 2446 'form1' : { 2447 'format' : '%(levelname)s ++ %(message)s', 2448 }, 2449 }, 2450 'handlers' : { 2451 'hand1' : { 2452 'class' : __name__ + '.CustomHandler', 2453 'formatter' : 'form1', 2454 'level' : 'NOTSET', 2455 'stream' : 'ext://sys.stdout', 2456 '9' : 'invalid parameter name', 2457 }, 2458 }, 2459 'loggers' : { 2460 'compiler.parser' : { 2461 'level' : 'DEBUG', 2462 'handlers' : ['hand1'], 2463 }, 2464 }, 2465 'root' : { 2466 'level' : 'WARNING', 2467 }, 2468 } 2469 2470 # config 7 does not define compiler.parser but defines compiler.lexer 2471 # so compiler.parser should be disabled after applying it 2472 config7 = { 2473 'version': 1, 2474 'formatters': { 2475 'form1' : { 2476 'format' : '%(levelname)s ++ %(message)s', 2477 }, 2478 }, 2479 'handlers' : { 2480 'hand1' : { 2481 'class' : 'logging.StreamHandler', 2482 'formatter' : 'form1', 2483 'level' : 'NOTSET', 2484 'stream' : 'ext://sys.stdout', 2485 }, 2486 }, 2487 'loggers' : { 2488 'compiler.lexer' : { 2489 'level' : 'DEBUG', 2490 'handlers' : ['hand1'], 2491 }, 2492 }, 2493 'root' : { 2494 'level' : 'WARNING', 2495 }, 2496 } 2497 2498 # config8 defines both compiler and compiler.lexer 2499 # so compiler.parser should not be disabled (since 2500 # compiler is defined) 2501 config8 = { 2502 'version': 1, 2503 'disable_existing_loggers' : False, 2504 'formatters': { 2505 'form1' : { 2506 'format' : '%(levelname)s ++ %(message)s', 2507 }, 2508 }, 2509 'handlers' : { 2510 'hand1' : { 2511 'class' : 'logging.StreamHandler', 2512 'formatter' : 'form1', 2513 'level' : 'NOTSET', 2514 'stream' : 'ext://sys.stdout', 2515 }, 2516 }, 2517 'loggers' : { 2518 'compiler' : { 2519 'level' : 'DEBUG', 2520 'handlers' : ['hand1'], 2521 }, 2522 'compiler.lexer' : { 2523 }, 2524 }, 2525 'root' : { 2526 'level' : 'WARNING', 2527 }, 2528 } 2529 2530 # config8a disables existing loggers 2531 config8a = { 2532 'version': 1, 2533 'disable_existing_loggers' : True, 2534 'formatters': { 2535 'form1' : { 2536 'format' : '%(levelname)s ++ %(message)s', 2537 }, 2538 }, 2539 'handlers' : { 2540 'hand1' : { 2541 'class' : 'logging.StreamHandler', 2542 'formatter' : 'form1', 2543 'level' : 'NOTSET', 2544 'stream' : 'ext://sys.stdout', 2545 }, 2546 }, 2547 'loggers' : { 2548 'compiler' : { 2549 'level' : 'DEBUG', 2550 'handlers' : ['hand1'], 2551 }, 2552 'compiler.lexer' : { 2553 }, 2554 }, 2555 'root' : { 2556 'level' : 'WARNING', 2557 }, 2558 } 2559 2560 config9 = { 2561 'version': 1, 2562 'formatters': { 2563 'form1' : { 2564 'format' : '%(levelname)s ++ %(message)s', 2565 }, 2566 }, 2567 'handlers' : { 2568 'hand1' : { 2569 'class' : 'logging.StreamHandler', 2570 'formatter' : 'form1', 2571 'level' : 'WARNING', 2572 'stream' : 'ext://sys.stdout', 2573 }, 2574 }, 2575 'loggers' : { 2576 'compiler.parser' : { 2577 'level' : 'WARNING', 2578 'handlers' : ['hand1'], 2579 }, 2580 }, 2581 'root' : { 2582 'level' : 'NOTSET', 2583 }, 2584 } 2585 2586 config9a = { 2587 'version': 1, 2588 'incremental' : True, 2589 'handlers' : { 2590 'hand1' : { 2591 'level' : 'WARNING', 2592 }, 2593 }, 2594 'loggers' : { 2595 'compiler.parser' : { 2596 'level' : 'INFO', 2597 }, 2598 }, 2599 } 2600 2601 config9b = { 2602 'version': 1, 2603 'incremental' : True, 2604 'handlers' : { 2605 'hand1' : { 2606 'level' : 'INFO', 2607 }, 2608 }, 2609 'loggers' : { 2610 'compiler.parser' : { 2611 'level' : 'INFO', 2612 }, 2613 }, 2614 } 2615 2616 # As config1 but with a filter added 2617 config10 = { 2618 'version': 1, 2619 'formatters': { 2620 'form1' : { 2621 'format' : '%(levelname)s ++ %(message)s', 2622 }, 2623 }, 2624 'filters' : { 2625 'filt1' : { 2626 'name' : 'compiler.parser', 2627 }, 2628 }, 2629 'handlers' : { 2630 'hand1' : { 2631 'class' : 'logging.StreamHandler', 2632 'formatter' : 'form1', 2633 'level' : 'NOTSET', 2634 'stream' : 'ext://sys.stdout', 2635 'filters' : ['filt1'], 2636 }, 2637 }, 2638 'loggers' : { 2639 'compiler.parser' : { 2640 'level' : 'DEBUG', 2641 'filters' : ['filt1'], 2642 }, 2643 }, 2644 'root' : { 2645 'level' : 'WARNING', 2646 'handlers' : ['hand1'], 2647 }, 2648 } 2649 2650 # As config1 but using cfg:// references 2651 config11 = { 2652 'version': 1, 2653 'true_formatters': { 2654 'form1' : { 2655 'format' : '%(levelname)s ++ %(message)s', 2656 }, 2657 }, 2658 'handler_configs': { 2659 'hand1' : { 2660 'class' : 'logging.StreamHandler', 2661 'formatter' : 'form1', 2662 'level' : 'NOTSET', 2663 'stream' : 'ext://sys.stdout', 2664 }, 2665 }, 2666 'formatters' : 'cfg://true_formatters', 2667 'handlers' : { 2668 'hand1' : 'cfg://handler_configs[hand1]', 2669 }, 2670 'loggers' : { 2671 'compiler.parser' : { 2672 'level' : 'DEBUG', 2673 'handlers' : ['hand1'], 2674 }, 2675 }, 2676 'root' : { 2677 'level' : 'WARNING', 2678 }, 2679 } 2680 2681 # As config11 but missing the version key 2682 config12 = { 2683 'true_formatters': { 2684 'form1' : { 2685 'format' : '%(levelname)s ++ %(message)s', 2686 }, 2687 }, 2688 'handler_configs': { 2689 'hand1' : { 2690 'class' : 'logging.StreamHandler', 2691 'formatter' : 'form1', 2692 'level' : 'NOTSET', 2693 'stream' : 'ext://sys.stdout', 2694 }, 2695 }, 2696 'formatters' : 'cfg://true_formatters', 2697 'handlers' : { 2698 'hand1' : 'cfg://handler_configs[hand1]', 2699 }, 2700 'loggers' : { 2701 'compiler.parser' : { 2702 'level' : 'DEBUG', 2703 'handlers' : ['hand1'], 2704 }, 2705 }, 2706 'root' : { 2707 'level' : 'WARNING', 2708 }, 2709 } 2710 2711 # As config11 but using an unsupported version 2712 config13 = { 2713 'version': 2, 2714 'true_formatters': { 2715 'form1' : { 2716 'format' : '%(levelname)s ++ %(message)s', 2717 }, 2718 }, 2719 'handler_configs': { 2720 'hand1' : { 2721 'class' : 'logging.StreamHandler', 2722 'formatter' : 'form1', 2723 'level' : 'NOTSET', 2724 'stream' : 'ext://sys.stdout', 2725 }, 2726 }, 2727 'formatters' : 'cfg://true_formatters', 2728 'handlers' : { 2729 'hand1' : 'cfg://handler_configs[hand1]', 2730 }, 2731 'loggers' : { 2732 'compiler.parser' : { 2733 'level' : 'DEBUG', 2734 'handlers' : ['hand1'], 2735 }, 2736 }, 2737 'root' : { 2738 'level' : 'WARNING', 2739 }, 2740 } 2741 2742 # As config0, but with properties 2743 config14 = { 2744 'version': 1, 2745 'formatters': { 2746 'form1' : { 2747 'format' : '%(levelname)s ++ %(message)s', 2748 }, 2749 }, 2750 'handlers' : { 2751 'hand1' : { 2752 'class' : 'logging.StreamHandler', 2753 'formatter' : 'form1', 2754 'level' : 'NOTSET', 2755 'stream' : 'ext://sys.stdout', 2756 '.': { 2757 'foo': 'bar', 2758 'terminator': '!\n', 2759 } 2760 }, 2761 }, 2762 'root' : { 2763 'level' : 'WARNING', 2764 'handlers' : ['hand1'], 2765 }, 2766 } 2767 2768 out_of_order = { 2769 "version": 1, 2770 "formatters": { 2771 "mySimpleFormatter": { 2772 "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s", 2773 "style": "$" 2774 } 2775 }, 2776 "handlers": { 2777 "fileGlobal": { 2778 "class": "logging.StreamHandler", 2779 "level": "DEBUG", 2780 "formatter": "mySimpleFormatter" 2781 }, 2782 "bufferGlobal": { 2783 "class": "logging.handlers.MemoryHandler", 2784 "capacity": 5, 2785 "formatter": "mySimpleFormatter", 2786 "target": "fileGlobal", 2787 "level": "DEBUG" 2788 } 2789 }, 2790 "loggers": { 2791 "mymodule": { 2792 "level": "DEBUG", 2793 "handlers": ["bufferGlobal"], 2794 "propagate": "true" 2795 } 2796 } 2797 } 2798 2799 # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False 2800 custom_formatter_class_validate = { 2801 'version': 1, 2802 'formatters': { 2803 'form1': { 2804 '()': __name__ + '.ExceptionFormatter', 2805 'format': '%(levelname)s:%(name)s:%(message)s', 2806 'validate': False, 2807 }, 2808 }, 2809 'handlers' : { 2810 'hand1' : { 2811 'class': 'logging.StreamHandler', 2812 'formatter': 'form1', 2813 'level': 'NOTSET', 2814 'stream': 'ext://sys.stdout', 2815 }, 2816 }, 2817 "loggers": { 2818 "my_test_logger_custom_formatter": { 2819 "level": "DEBUG", 2820 "handlers": ["hand1"], 2821 "propagate": "true" 2822 } 2823 } 2824 } 2825 2826 # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False 2827 custom_formatter_class_validate2 = { 2828 'version': 1, 2829 'formatters': { 2830 'form1': { 2831 'class': __name__ + '.ExceptionFormatter', 2832 'format': '%(levelname)s:%(name)s:%(message)s', 2833 'validate': False, 2834 }, 2835 }, 2836 'handlers' : { 2837 'hand1' : { 2838 'class': 'logging.StreamHandler', 2839 'formatter': 'form1', 2840 'level': 'NOTSET', 2841 'stream': 'ext://sys.stdout', 2842 }, 2843 }, 2844 "loggers": { 2845 "my_test_logger_custom_formatter": { 2846 "level": "DEBUG", 2847 "handlers": ["hand1"], 2848 "propagate": "true" 2849 } 2850 } 2851 } 2852 2853 # Configuration with custom class that is not inherited from logging.Formatter 2854 custom_formatter_class_validate3 = { 2855 'version': 1, 2856 'formatters': { 2857 'form1': { 2858 'class': __name__ + '.myCustomFormatter', 2859 'format': '%(levelname)s:%(name)s:%(message)s', 2860 'validate': False, 2861 }, 2862 }, 2863 'handlers' : { 2864 'hand1' : { 2865 'class': 'logging.StreamHandler', 2866 'formatter': 'form1', 2867 'level': 'NOTSET', 2868 'stream': 'ext://sys.stdout', 2869 }, 2870 }, 2871 "loggers": { 2872 "my_test_logger_custom_formatter": { 2873 "level": "DEBUG", 2874 "handlers": ["hand1"], 2875 "propagate": "true" 2876 } 2877 } 2878 } 2879 2880 # Configuration with custom function and 'validate' set to False 2881 custom_formatter_with_function = { 2882 'version': 1, 2883 'formatters': { 2884 'form1': { 2885 '()': formatFunc, 2886 'format': '%(levelname)s:%(name)s:%(message)s', 2887 'validate': False, 2888 }, 2889 }, 2890 'handlers' : { 2891 'hand1' : { 2892 'class': 'logging.StreamHandler', 2893 'formatter': 'form1', 2894 'level': 'NOTSET', 2895 'stream': 'ext://sys.stdout', 2896 }, 2897 }, 2898 "loggers": { 2899 "my_test_logger_custom_formatter": { 2900 "level": "DEBUG", 2901 "handlers": ["hand1"], 2902 "propagate": "true" 2903 } 2904 } 2905 } 2906 2907 def apply_config(self, conf): 2908 logging.config.dictConfig(conf) 2909 2910 def test_config0_ok(self): 2911 # A simple config which overrides the default settings. 2912 with support.captured_stdout() as output: 2913 self.apply_config(self.config0) 2914 logger = logging.getLogger() 2915 # Won't output anything 2916 logger.info(self.next_message()) 2917 # Outputs a message 2918 logger.error(self.next_message()) 2919 self.assert_log_lines([ 2920 ('ERROR', '2'), 2921 ], stream=output) 2922 # Original logger output is empty. 2923 self.assert_log_lines([]) 2924 2925 def test_config1_ok(self, config=config1): 2926 # A config defining a sub-parser as well. 2927 with support.captured_stdout() as output: 2928 self.apply_config(config) 2929 logger = logging.getLogger("compiler.parser") 2930 # Both will output a message 2931 logger.info(self.next_message()) 2932 logger.error(self.next_message()) 2933 self.assert_log_lines([ 2934 ('INFO', '1'), 2935 ('ERROR', '2'), 2936 ], stream=output) 2937 # Original logger output is empty. 2938 self.assert_log_lines([]) 2939 2940 def test_config2_failure(self): 2941 # A simple config which overrides the default settings. 2942 self.assertRaises(Exception, self.apply_config, self.config2) 2943 2944 def test_config2a_failure(self): 2945 # A simple config which overrides the default settings. 2946 self.assertRaises(Exception, self.apply_config, self.config2a) 2947 2948 def test_config2b_failure(self): 2949 # A simple config which overrides the default settings. 2950 self.assertRaises(Exception, self.apply_config, self.config2b) 2951 2952 def test_config3_failure(self): 2953 # A simple config which overrides the default settings. 2954 self.assertRaises(Exception, self.apply_config, self.config3) 2955 2956 def test_config4_ok(self): 2957 # A config specifying a custom formatter class. 2958 with support.captured_stdout() as output: 2959 self.apply_config(self.config4) 2960 #logger = logging.getLogger() 2961 try: 2962 raise RuntimeError() 2963 except RuntimeError: 2964 logging.exception("just testing") 2965 sys.stdout.seek(0) 2966 self.assertEqual(output.getvalue(), 2967 "ERROR:root:just testing\nGot a [RuntimeError]\n") 2968 # Original logger output is empty 2969 self.assert_log_lines([]) 2970 2971 def test_config4a_ok(self): 2972 # A config specifying a custom formatter class. 2973 with support.captured_stdout() as output: 2974 self.apply_config(self.config4a) 2975 #logger = logging.getLogger() 2976 try: 2977 raise RuntimeError() 2978 except RuntimeError: 2979 logging.exception("just testing") 2980 sys.stdout.seek(0) 2981 self.assertEqual(output.getvalue(), 2982 "ERROR:root:just testing\nGot a [RuntimeError]\n") 2983 # Original logger output is empty 2984 self.assert_log_lines([]) 2985 2986 def test_config5_ok(self): 2987 self.test_config1_ok(config=self.config5) 2988 2989 def test_config6_failure(self): 2990 self.assertRaises(Exception, self.apply_config, self.config6) 2991 2992 def test_config7_ok(self): 2993 with support.captured_stdout() as output: 2994 self.apply_config(self.config1) 2995 logger = logging.getLogger("compiler.parser") 2996 # Both will output a message 2997 logger.info(self.next_message()) 2998 logger.error(self.next_message()) 2999 self.assert_log_lines([ 3000 ('INFO', '1'), 3001 ('ERROR', '2'), 3002 ], stream=output) 3003 # Original logger output is empty. 3004 self.assert_log_lines([]) 3005 with support.captured_stdout() as output: 3006 self.apply_config(self.config7) 3007 logger = logging.getLogger("compiler.parser") 3008 self.assertTrue(logger.disabled) 3009 logger = logging.getLogger("compiler.lexer") 3010 # Both will output a message 3011 logger.info(self.next_message()) 3012 logger.error(self.next_message()) 3013 self.assert_log_lines([ 3014 ('INFO', '3'), 3015 ('ERROR', '4'), 3016 ], stream=output) 3017 # Original logger output is empty. 3018 self.assert_log_lines([]) 3019 3020 # Same as test_config_7_ok but don't disable old loggers. 3021 def test_config_8_ok(self): 3022 with support.captured_stdout() as output: 3023 self.apply_config(self.config1) 3024 logger = logging.getLogger("compiler.parser") 3025 # All will output a message 3026 logger.info(self.next_message()) 3027 logger.error(self.next_message()) 3028 self.assert_log_lines([ 3029 ('INFO', '1'), 3030 ('ERROR', '2'), 3031 ], stream=output) 3032 # Original logger output is empty. 3033 self.assert_log_lines([]) 3034 with support.captured_stdout() as output: 3035 self.apply_config(self.config8) 3036 logger = logging.getLogger("compiler.parser") 3037 self.assertFalse(logger.disabled) 3038 # Both will output a message 3039 logger.info(self.next_message()) 3040 logger.error(self.next_message()) 3041 logger = logging.getLogger("compiler.lexer") 3042 # Both will output a message 3043 logger.info(self.next_message()) 3044 logger.error(self.next_message()) 3045 self.assert_log_lines([ 3046 ('INFO', '3'), 3047 ('ERROR', '4'), 3048 ('INFO', '5'), 3049 ('ERROR', '6'), 3050 ], stream=output) 3051 # Original logger output is empty. 3052 self.assert_log_lines([]) 3053 3054 def test_config_8a_ok(self): 3055 with support.captured_stdout() as output: 3056 self.apply_config(self.config1a) 3057 logger = logging.getLogger("compiler.parser") 3058 # See issue #11424. compiler-hyphenated sorts 3059 # between compiler and compiler.xyz and this 3060 # was preventing compiler.xyz from being included 3061 # in the child loggers of compiler because of an 3062 # overzealous loop termination condition. 3063 hyphenated = logging.getLogger('compiler-hyphenated') 3064 # All will output a message 3065 logger.info(self.next_message()) 3066 logger.error(self.next_message()) 3067 hyphenated.critical(self.next_message()) 3068 self.assert_log_lines([ 3069 ('INFO', '1'), 3070 ('ERROR', '2'), 3071 ('CRITICAL', '3'), 3072 ], stream=output) 3073 # Original logger output is empty. 3074 self.assert_log_lines([]) 3075 with support.captured_stdout() as output: 3076 self.apply_config(self.config8a) 3077 logger = logging.getLogger("compiler.parser") 3078 self.assertFalse(logger.disabled) 3079 # Both will output a message 3080 logger.info(self.next_message()) 3081 logger.error(self.next_message()) 3082 logger = logging.getLogger("compiler.lexer") 3083 # Both will output a message 3084 logger.info(self.next_message()) 3085 logger.error(self.next_message()) 3086 # Will not appear 3087 hyphenated.critical(self.next_message()) 3088 self.assert_log_lines([ 3089 ('INFO', '4'), 3090 ('ERROR', '5'), 3091 ('INFO', '6'), 3092 ('ERROR', '7'), 3093 ], stream=output) 3094 # Original logger output is empty. 3095 self.assert_log_lines([]) 3096 3097 def test_config_9_ok(self): 3098 with support.captured_stdout() as output: 3099 self.apply_config(self.config9) 3100 logger = logging.getLogger("compiler.parser") 3101 # Nothing will be output since both handler and logger are set to WARNING 3102 logger.info(self.next_message()) 3103 self.assert_log_lines([], stream=output) 3104 self.apply_config(self.config9a) 3105 # Nothing will be output since handler is still set to WARNING 3106 logger.info(self.next_message()) 3107 self.assert_log_lines([], stream=output) 3108 self.apply_config(self.config9b) 3109 # Message should now be output 3110 logger.info(self.next_message()) 3111 self.assert_log_lines([ 3112 ('INFO', '3'), 3113 ], stream=output) 3114 3115 def test_config_10_ok(self): 3116 with support.captured_stdout() as output: 3117 self.apply_config(self.config10) 3118 logger = logging.getLogger("compiler.parser") 3119 logger.warning(self.next_message()) 3120 logger = logging.getLogger('compiler') 3121 # Not output, because filtered 3122 logger.warning(self.next_message()) 3123 logger = logging.getLogger('compiler.lexer') 3124 # Not output, because filtered 3125 logger.warning(self.next_message()) 3126 logger = logging.getLogger("compiler.parser.codegen") 3127 # Output, as not filtered 3128 logger.error(self.next_message()) 3129 self.assert_log_lines([ 3130 ('WARNING', '1'), 3131 ('ERROR', '4'), 3132 ], stream=output) 3133 3134 def test_config11_ok(self): 3135 self.test_config1_ok(self.config11) 3136 3137 def test_config12_failure(self): 3138 self.assertRaises(Exception, self.apply_config, self.config12) 3139 3140 def test_config13_failure(self): 3141 self.assertRaises(Exception, self.apply_config, self.config13) 3142 3143 def test_config14_ok(self): 3144 with support.captured_stdout() as output: 3145 self.apply_config(self.config14) 3146 h = logging._handlers['hand1'] 3147 self.assertEqual(h.foo, 'bar') 3148 self.assertEqual(h.terminator, '!\n') 3149 logging.warning('Exclamation') 3150 self.assertTrue(output.getvalue().endswith('Exclamation!\n')) 3151 3152 def test_config15_ok(self): 3153 3154 def cleanup(h1, fn): 3155 h1.close() 3156 os.remove(fn) 3157 3158 with self.check_no_resource_warning(): 3159 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 3160 os.close(fd) 3161 3162 config = { 3163 "version": 1, 3164 "handlers": { 3165 "file": { 3166 "class": "logging.FileHandler", 3167 "filename": fn 3168 } 3169 }, 3170 "root": { 3171 "handlers": ["file"] 3172 } 3173 } 3174 3175 self.apply_config(config) 3176 self.apply_config(config) 3177 3178 handler = logging.root.handlers[0] 3179 self.addCleanup(cleanup, handler, fn) 3180 3181 def setup_via_listener(self, text, verify=None): 3182 text = text.encode("utf-8") 3183 # Ask for a randomly assigned port (by using port 0) 3184 t = logging.config.listen(0, verify) 3185 t.start() 3186 t.ready.wait() 3187 # Now get the port allocated 3188 port = t.port 3189 t.ready.clear() 3190 try: 3191 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 3192 sock.settimeout(2.0) 3193 sock.connect(('localhost', port)) 3194 3195 slen = struct.pack('>L', len(text)) 3196 s = slen + text 3197 sentsofar = 0 3198 left = len(s) 3199 while left > 0: 3200 sent = sock.send(s[sentsofar:]) 3201 sentsofar += sent 3202 left -= sent 3203 sock.close() 3204 finally: 3205 t.ready.wait(2.0) 3206 logging.config.stopListening() 3207 support.join_thread(t, 2.0) 3208 3209 def test_listen_config_10_ok(self): 3210 with support.captured_stdout() as output: 3211 self.setup_via_listener(json.dumps(self.config10)) 3212 logger = logging.getLogger("compiler.parser") 3213 logger.warning(self.next_message()) 3214 logger = logging.getLogger('compiler') 3215 # Not output, because filtered 3216 logger.warning(self.next_message()) 3217 logger = logging.getLogger('compiler.lexer') 3218 # Not output, because filtered 3219 logger.warning(self.next_message()) 3220 logger = logging.getLogger("compiler.parser.codegen") 3221 # Output, as not filtered 3222 logger.error(self.next_message()) 3223 self.assert_log_lines([ 3224 ('WARNING', '1'), 3225 ('ERROR', '4'), 3226 ], stream=output) 3227 3228 def test_listen_config_1_ok(self): 3229 with support.captured_stdout() as output: 3230 self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) 3231 logger = logging.getLogger("compiler.parser") 3232 # Both will output a message 3233 logger.info(self.next_message()) 3234 logger.error(self.next_message()) 3235 self.assert_log_lines([ 3236 ('INFO', '1'), 3237 ('ERROR', '2'), 3238 ], stream=output) 3239 # Original logger output is empty. 3240 self.assert_log_lines([]) 3241 3242 def test_listen_verify(self): 3243 3244 def verify_fail(stuff): 3245 return None 3246 3247 def verify_reverse(stuff): 3248 return stuff[::-1] 3249 3250 logger = logging.getLogger("compiler.parser") 3251 to_send = textwrap.dedent(ConfigFileTest.config1) 3252 # First, specify a verification function that will fail. 3253 # We expect to see no output, since our configuration 3254 # never took effect. 3255 with support.captured_stdout() as output: 3256 self.setup_via_listener(to_send, verify_fail) 3257 # Both will output a message 3258 logger.info(self.next_message()) 3259 logger.error(self.next_message()) 3260 self.assert_log_lines([], stream=output) 3261 # Original logger output has the stuff we logged. 3262 self.assert_log_lines([ 3263 ('INFO', '1'), 3264 ('ERROR', '2'), 3265 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3266 3267 # Now, perform no verification. Our configuration 3268 # should take effect. 3269 3270 with support.captured_stdout() as output: 3271 self.setup_via_listener(to_send) # no verify callable specified 3272 logger = logging.getLogger("compiler.parser") 3273 # Both will output a message 3274 logger.info(self.next_message()) 3275 logger.error(self.next_message()) 3276 self.assert_log_lines([ 3277 ('INFO', '3'), 3278 ('ERROR', '4'), 3279 ], stream=output) 3280 # Original logger output still has the stuff we logged before. 3281 self.assert_log_lines([ 3282 ('INFO', '1'), 3283 ('ERROR', '2'), 3284 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3285 3286 # Now, perform verification which transforms the bytes. 3287 3288 with support.captured_stdout() as output: 3289 self.setup_via_listener(to_send[::-1], verify_reverse) 3290 logger = logging.getLogger("compiler.parser") 3291 # Both will output a message 3292 logger.info(self.next_message()) 3293 logger.error(self.next_message()) 3294 self.assert_log_lines([ 3295 ('INFO', '5'), 3296 ('ERROR', '6'), 3297 ], stream=output) 3298 # Original logger output still has the stuff we logged before. 3299 self.assert_log_lines([ 3300 ('INFO', '1'), 3301 ('ERROR', '2'), 3302 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3303 3304 def test_out_of_order(self): 3305 self.assertRaises(ValueError, self.apply_config, self.out_of_order) 3306 3307 def test_out_of_order_with_dollar_style(self): 3308 config = copy.deepcopy(self.out_of_order) 3309 config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}" 3310 3311 self.apply_config(config) 3312 handler = logging.getLogger('mymodule').handlers[0] 3313 self.assertIsInstance(handler.target, logging.Handler) 3314 self.assertIsInstance(handler.formatter._style, 3315 logging.StringTemplateStyle) 3316 3317 def test_custom_formatter_class_with_validate(self): 3318 self.apply_config(self.custom_formatter_class_validate) 3319 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3320 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3321 3322 def test_custom_formatter_class_with_validate2(self): 3323 self.apply_config(self.custom_formatter_class_validate2) 3324 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3325 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3326 3327 def test_custom_formatter_class_with_validate2_with_wrong_fmt(self): 3328 config = self.custom_formatter_class_validate.copy() 3329 config['formatters']['form1']['style'] = "$" 3330 3331 # Exception should not be raise as we have configured 'validate' to False 3332 self.apply_config(config) 3333 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3334 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3335 3336 def test_custom_formatter_class_with_validate3(self): 3337 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3) 3338 3339 def test_custom_formatter_function_with_validate(self): 3340 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function) 3341 3342 def test_baseconfig(self): 3343 d = { 3344 'atuple': (1, 2, 3), 3345 'alist': ['a', 'b', 'c'], 3346 'adict': {'d': 'e', 'f': 3 }, 3347 'nest1': ('g', ('h', 'i'), 'j'), 3348 'nest2': ['k', ['l', 'm'], 'n'], 3349 'nest3': ['o', 'cfg://alist', 'p'], 3350 } 3351 bc = logging.config.BaseConfigurator(d) 3352 self.assertEqual(bc.convert('cfg://atuple[1]'), 2) 3353 self.assertEqual(bc.convert('cfg://alist[1]'), 'b') 3354 self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h') 3355 self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm') 3356 self.assertEqual(bc.convert('cfg://adict.d'), 'e') 3357 self.assertEqual(bc.convert('cfg://adict[f]'), 3) 3358 v = bc.convert('cfg://nest3') 3359 self.assertEqual(v.pop(1), ['a', 'b', 'c']) 3360 self.assertRaises(KeyError, bc.convert, 'cfg://nosuch') 3361 self.assertRaises(ValueError, bc.convert, 'cfg://!') 3362 self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]') 3363 3364class ManagerTest(BaseTest): 3365 def test_manager_loggerclass(self): 3366 logged = [] 3367 3368 class MyLogger(logging.Logger): 3369 def _log(self, level, msg, args, exc_info=None, extra=None): 3370 logged.append(msg) 3371 3372 man = logging.Manager(None) 3373 self.assertRaises(TypeError, man.setLoggerClass, int) 3374 man.setLoggerClass(MyLogger) 3375 logger = man.getLogger('test') 3376 logger.warning('should appear in logged') 3377 logging.warning('should not appear in logged') 3378 3379 self.assertEqual(logged, ['should appear in logged']) 3380 3381 def test_set_log_record_factory(self): 3382 man = logging.Manager(None) 3383 expected = object() 3384 man.setLogRecordFactory(expected) 3385 self.assertEqual(man.logRecordFactory, expected) 3386 3387class ChildLoggerTest(BaseTest): 3388 def test_child_loggers(self): 3389 r = logging.getLogger() 3390 l1 = logging.getLogger('abc') 3391 l2 = logging.getLogger('def.ghi') 3392 c1 = r.getChild('xyz') 3393 c2 = r.getChild('uvw.xyz') 3394 self.assertIs(c1, logging.getLogger('xyz')) 3395 self.assertIs(c2, logging.getLogger('uvw.xyz')) 3396 c1 = l1.getChild('def') 3397 c2 = c1.getChild('ghi') 3398 c3 = l1.getChild('def.ghi') 3399 self.assertIs(c1, logging.getLogger('abc.def')) 3400 self.assertIs(c2, logging.getLogger('abc.def.ghi')) 3401 self.assertIs(c2, c3) 3402 3403 3404class DerivedLogRecord(logging.LogRecord): 3405 pass 3406 3407class LogRecordFactoryTest(BaseTest): 3408 3409 def setUp(self): 3410 class CheckingFilter(logging.Filter): 3411 def __init__(self, cls): 3412 self.cls = cls 3413 3414 def filter(self, record): 3415 t = type(record) 3416 if t is not self.cls: 3417 msg = 'Unexpected LogRecord type %s, expected %s' % (t, 3418 self.cls) 3419 raise TypeError(msg) 3420 return True 3421 3422 BaseTest.setUp(self) 3423 self.filter = CheckingFilter(DerivedLogRecord) 3424 self.root_logger.addFilter(self.filter) 3425 self.orig_factory = logging.getLogRecordFactory() 3426 3427 def tearDown(self): 3428 self.root_logger.removeFilter(self.filter) 3429 BaseTest.tearDown(self) 3430 logging.setLogRecordFactory(self.orig_factory) 3431 3432 def test_logrecord_class(self): 3433 self.assertRaises(TypeError, self.root_logger.warning, 3434 self.next_message()) 3435 logging.setLogRecordFactory(DerivedLogRecord) 3436 self.root_logger.error(self.next_message()) 3437 self.assert_log_lines([ 3438 ('root', 'ERROR', '2'), 3439 ]) 3440 3441 3442class QueueHandlerTest(BaseTest): 3443 # Do not bother with a logger name group. 3444 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 3445 3446 def setUp(self): 3447 BaseTest.setUp(self) 3448 self.queue = queue.Queue(-1) 3449 self.que_hdlr = logging.handlers.QueueHandler(self.queue) 3450 self.name = 'que' 3451 self.que_logger = logging.getLogger('que') 3452 self.que_logger.propagate = False 3453 self.que_logger.setLevel(logging.WARNING) 3454 self.que_logger.addHandler(self.que_hdlr) 3455 3456 def tearDown(self): 3457 self.que_hdlr.close() 3458 BaseTest.tearDown(self) 3459 3460 def test_queue_handler(self): 3461 self.que_logger.debug(self.next_message()) 3462 self.assertRaises(queue.Empty, self.queue.get_nowait) 3463 self.que_logger.info(self.next_message()) 3464 self.assertRaises(queue.Empty, self.queue.get_nowait) 3465 msg = self.next_message() 3466 self.que_logger.warning(msg) 3467 data = self.queue.get_nowait() 3468 self.assertTrue(isinstance(data, logging.LogRecord)) 3469 self.assertEqual(data.name, self.que_logger.name) 3470 self.assertEqual((data.msg, data.args), (msg, None)) 3471 3472 def test_formatting(self): 3473 msg = self.next_message() 3474 levelname = logging.getLevelName(logging.WARNING) 3475 log_format_str = '{name} -> {levelname}: {message}' 3476 formatted_msg = log_format_str.format(name=self.name, 3477 levelname=levelname, message=msg) 3478 formatter = logging.Formatter(self.log_format) 3479 self.que_hdlr.setFormatter(formatter) 3480 self.que_logger.warning(msg) 3481 log_record = self.queue.get_nowait() 3482 self.assertEqual(formatted_msg, log_record.msg) 3483 self.assertEqual(formatted_msg, log_record.message) 3484 3485 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3486 'logging.handlers.QueueListener required for this test') 3487 def test_queue_listener(self): 3488 handler = support.TestHandler(support.Matcher()) 3489 listener = logging.handlers.QueueListener(self.queue, handler) 3490 listener.start() 3491 try: 3492 self.que_logger.warning(self.next_message()) 3493 self.que_logger.error(self.next_message()) 3494 self.que_logger.critical(self.next_message()) 3495 finally: 3496 listener.stop() 3497 self.assertTrue(handler.matches(levelno=logging.WARNING, message='1')) 3498 self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) 3499 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) 3500 handler.close() 3501 3502 # Now test with respect_handler_level set 3503 3504 handler = support.TestHandler(support.Matcher()) 3505 handler.setLevel(logging.CRITICAL) 3506 listener = logging.handlers.QueueListener(self.queue, handler, 3507 respect_handler_level=True) 3508 listener.start() 3509 try: 3510 self.que_logger.warning(self.next_message()) 3511 self.que_logger.error(self.next_message()) 3512 self.que_logger.critical(self.next_message()) 3513 finally: 3514 listener.stop() 3515 self.assertFalse(handler.matches(levelno=logging.WARNING, message='4')) 3516 self.assertFalse(handler.matches(levelno=logging.ERROR, message='5')) 3517 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6')) 3518 handler.close() 3519 3520 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3521 'logging.handlers.QueueListener required for this test') 3522 def test_queue_listener_with_StreamHandler(self): 3523 # Test that traceback only appends once (bpo-34334). 3524 listener = logging.handlers.QueueListener(self.queue, self.root_hdlr) 3525 listener.start() 3526 try: 3527 1 / 0 3528 except ZeroDivisionError as e: 3529 exc = e 3530 self.que_logger.exception(self.next_message(), exc_info=exc) 3531 listener.stop() 3532 self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1) 3533 3534 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3535 'logging.handlers.QueueListener required for this test') 3536 def test_queue_listener_with_multiple_handlers(self): 3537 # Test that queue handler format doesn't affect other handler formats (bpo-35726). 3538 self.que_hdlr.setFormatter(self.root_formatter) 3539 self.que_logger.addHandler(self.root_hdlr) 3540 3541 listener = logging.handlers.QueueListener(self.queue, self.que_hdlr) 3542 listener.start() 3543 self.que_logger.error("error") 3544 listener.stop() 3545 self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error") 3546 3547if hasattr(logging.handlers, 'QueueListener'): 3548 import multiprocessing 3549 from unittest.mock import patch 3550 3551 class QueueListenerTest(BaseTest): 3552 """ 3553 Tests based on patch submitted for issue #27930. Ensure that 3554 QueueListener handles all log messages. 3555 """ 3556 3557 repeat = 20 3558 3559 @staticmethod 3560 def setup_and_log(log_queue, ident): 3561 """ 3562 Creates a logger with a QueueHandler that logs to a queue read by a 3563 QueueListener. Starts the listener, logs five messages, and stops 3564 the listener. 3565 """ 3566 logger = logging.getLogger('test_logger_with_id_%s' % ident) 3567 logger.setLevel(logging.DEBUG) 3568 handler = logging.handlers.QueueHandler(log_queue) 3569 logger.addHandler(handler) 3570 listener = logging.handlers.QueueListener(log_queue) 3571 listener.start() 3572 3573 logger.info('one') 3574 logger.info('two') 3575 logger.info('three') 3576 logger.info('four') 3577 logger.info('five') 3578 3579 listener.stop() 3580 logger.removeHandler(handler) 3581 handler.close() 3582 3583 @patch.object(logging.handlers.QueueListener, 'handle') 3584 def test_handle_called_with_queue_queue(self, mock_handle): 3585 for i in range(self.repeat): 3586 log_queue = queue.Queue() 3587 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3588 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3589 'correct number of handled log messages') 3590 3591 @patch.object(logging.handlers.QueueListener, 'handle') 3592 def test_handle_called_with_mp_queue(self, mock_handle): 3593 # Issue 28668: The multiprocessing (mp) module is not functional 3594 # when the mp.synchronize module cannot be imported. 3595 support.import_module('multiprocessing.synchronize') 3596 for i in range(self.repeat): 3597 log_queue = multiprocessing.Queue() 3598 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3599 log_queue.close() 3600 log_queue.join_thread() 3601 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3602 'correct number of handled log messages') 3603 3604 @staticmethod 3605 def get_all_from_queue(log_queue): 3606 try: 3607 while True: 3608 yield log_queue.get_nowait() 3609 except queue.Empty: 3610 return [] 3611 3612 def test_no_messages_in_queue_after_stop(self): 3613 """ 3614 Five messages are logged then the QueueListener is stopped. This 3615 test then gets everything off the queue. Failure of this test 3616 indicates that messages were not registered on the queue until 3617 _after_ the QueueListener stopped. 3618 """ 3619 # Issue 28668: The multiprocessing (mp) module is not functional 3620 # when the mp.synchronize module cannot be imported. 3621 support.import_module('multiprocessing.synchronize') 3622 for i in range(self.repeat): 3623 queue = multiprocessing.Queue() 3624 self.setup_and_log(queue, '%s_%s' %(self.id(), i)) 3625 # time.sleep(1) 3626 items = list(self.get_all_from_queue(queue)) 3627 queue.close() 3628 queue.join_thread() 3629 3630 expected = [[], [logging.handlers.QueueListener._sentinel]] 3631 self.assertIn(items, expected, 3632 'Found unexpected messages in queue: %s' % ( 3633 [m.msg if isinstance(m, logging.LogRecord) 3634 else m for m in items])) 3635 3636 def test_calls_task_done_after_stop(self): 3637 # Issue 36813: Make sure queue.join does not deadlock. 3638 log_queue = queue.Queue() 3639 listener = logging.handlers.QueueListener(log_queue) 3640 listener.start() 3641 listener.stop() 3642 with self.assertRaises(ValueError): 3643 # Make sure all tasks are done and .join won't block. 3644 log_queue.task_done() 3645 3646 3647ZERO = datetime.timedelta(0) 3648 3649class UTC(datetime.tzinfo): 3650 def utcoffset(self, dt): 3651 return ZERO 3652 3653 dst = utcoffset 3654 3655 def tzname(self, dt): 3656 return 'UTC' 3657 3658utc = UTC() 3659 3660class FormatterTest(unittest.TestCase): 3661 def setUp(self): 3662 self.common = { 3663 'name': 'formatter.test', 3664 'level': logging.DEBUG, 3665 'pathname': os.path.join('path', 'to', 'dummy.ext'), 3666 'lineno': 42, 3667 'exc_info': None, 3668 'func': None, 3669 'msg': 'Message with %d %s', 3670 'args': (2, 'placeholders'), 3671 } 3672 self.variants = { 3673 } 3674 3675 def get_record(self, name=None): 3676 result = dict(self.common) 3677 if name is not None: 3678 result.update(self.variants[name]) 3679 return logging.makeLogRecord(result) 3680 3681 def assert_error_message(self, exception, message, *args, **kwargs): 3682 try: 3683 self.assertRaises(exception, *args, **kwargs) 3684 except exception as e: 3685 self.assertEqual(message, e.message) 3686 3687 def test_percent(self): 3688 # Test %-formatting 3689 r = self.get_record() 3690 f = logging.Formatter('${%(message)s}') 3691 self.assertEqual(f.format(r), '${Message with 2 placeholders}') 3692 f = logging.Formatter('%(random)s') 3693 self.assertRaises(ValueError, f.format, r) 3694 self.assertFalse(f.usesTime()) 3695 f = logging.Formatter('%(asctime)s') 3696 self.assertTrue(f.usesTime()) 3697 f = logging.Formatter('%(asctime)-15s') 3698 self.assertTrue(f.usesTime()) 3699 f = logging.Formatter('%(asctime)#15s') 3700 self.assertTrue(f.usesTime()) 3701 3702 def test_braces(self): 3703 # Test {}-formatting 3704 r = self.get_record() 3705 f = logging.Formatter('$%{message}%$', style='{') 3706 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3707 f = logging.Formatter('{random}', style='{') 3708 self.assertRaises(ValueError, f.format, r) 3709 f = logging.Formatter("{message}", style='{') 3710 self.assertFalse(f.usesTime()) 3711 f = logging.Formatter('{asctime}', style='{') 3712 self.assertTrue(f.usesTime()) 3713 f = logging.Formatter('{asctime!s:15}', style='{') 3714 self.assertTrue(f.usesTime()) 3715 f = logging.Formatter('{asctime:15}', style='{') 3716 self.assertTrue(f.usesTime()) 3717 3718 def test_dollars(self): 3719 # Test $-formatting 3720 r = self.get_record() 3721 f = logging.Formatter('${message}', style='$') 3722 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3723 f = logging.Formatter('$message', style='$') 3724 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3725 f = logging.Formatter('$$%${message}%$$', style='$') 3726 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3727 f = logging.Formatter('${random}', style='$') 3728 self.assertRaises(ValueError, f.format, r) 3729 self.assertFalse(f.usesTime()) 3730 f = logging.Formatter('${asctime}', style='$') 3731 self.assertTrue(f.usesTime()) 3732 f = logging.Formatter('$asctime', style='$') 3733 self.assertTrue(f.usesTime()) 3734 f = logging.Formatter('${message}', style='$') 3735 self.assertFalse(f.usesTime()) 3736 f = logging.Formatter('${asctime}--', style='$') 3737 self.assertTrue(f.usesTime()) 3738 3739 def test_format_validate(self): 3740 # Check correct formatting 3741 # Percentage style 3742 f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3743 self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3744 f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3745 self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3746 f = logging.Formatter("%(process)#+027.23X") 3747 self.assertEqual(f._fmt, "%(process)#+027.23X") 3748 f = logging.Formatter("%(foo)#.*g") 3749 self.assertEqual(f._fmt, "%(foo)#.*g") 3750 3751 # StrFormat Style 3752 f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{") 3753 self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}") 3754 f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{") 3755 self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}") 3756 f = logging.Formatter("{customfield!s:#<30}", style="{") 3757 self.assertEqual(f._fmt, "{customfield!s:#<30}") 3758 f = logging.Formatter("{message!r}", style="{") 3759 self.assertEqual(f._fmt, "{message!r}") 3760 f = logging.Formatter("{message!s}", style="{") 3761 self.assertEqual(f._fmt, "{message!s}") 3762 f = logging.Formatter("{message!a}", style="{") 3763 self.assertEqual(f._fmt, "{message!a}") 3764 f = logging.Formatter("{process!r:4.2}", style="{") 3765 self.assertEqual(f._fmt, "{process!r:4.2}") 3766 f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{") 3767 self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}") 3768 f = logging.Formatter("{process!s:{w},.{p}}", style="{") 3769 self.assertEqual(f._fmt, "{process!s:{w},.{p}}") 3770 f = logging.Formatter("{foo:12.{p}}", style="{") 3771 self.assertEqual(f._fmt, "{foo:12.{p}}") 3772 f = logging.Formatter("{foo:{w}.6}", style="{") 3773 self.assertEqual(f._fmt, "{foo:{w}.6}") 3774 f = logging.Formatter("{foo[0].bar[1].baz}", style="{") 3775 self.assertEqual(f._fmt, "{foo[0].bar[1].baz}") 3776 f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{") 3777 self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}") 3778 f = logging.Formatter("{12[k1].bar[k2].baz}", style="{") 3779 self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}") 3780 3781 # Dollar style 3782 f = logging.Formatter("${asctime} - $message", style="$") 3783 self.assertEqual(f._fmt, "${asctime} - $message") 3784 f = logging.Formatter("$bar $$", style="$") 3785 self.assertEqual(f._fmt, "$bar $$") 3786 f = logging.Formatter("$bar $$$$", style="$") 3787 self.assertEqual(f._fmt, "$bar $$$$") # this would print two $($$) 3788 3789 # Testing when ValueError being raised from incorrect format 3790 # Percentage Style 3791 self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z") 3792 self.assertRaises(ValueError, logging.Formatter, "%(asctime)b") 3793 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*") 3794 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s") 3795 self.assertRaises(ValueError, logging.Formatter, "%(asctime)_") 3796 self.assertRaises(ValueError, logging.Formatter, '{asctime}') 3797 self.assertRaises(ValueError, logging.Formatter, '${message}') 3798 self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f') # with both * and decimal number as precision 3799 self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f') 3800 3801 # StrFormat Style 3802 # Testing failure for '-' in field name 3803 self.assert_error_message( 3804 ValueError, 3805 "invalid field name/expression: 'name-thing'", 3806 logging.Formatter, "{name-thing}", style="{" 3807 ) 3808 # Testing failure for style mismatch 3809 self.assert_error_message( 3810 ValueError, 3811 "invalid format: no fields", 3812 logging.Formatter, '%(asctime)s', style='{' 3813 ) 3814 # Testing failure for invalid conversion 3815 self.assert_error_message( 3816 ValueError, 3817 "invalid conversion: 'Z'" 3818 ) 3819 self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{') 3820 self.assert_error_message( 3821 ValueError, 3822 "invalid format: expected ':' after conversion specifier", 3823 logging.Formatter, '{asctime!aa:15}', style='{' 3824 ) 3825 # Testing failure for invalid spec 3826 self.assert_error_message( 3827 ValueError, 3828 "bad specifier: '.2ff'", 3829 logging.Formatter, '{process:.2ff}', style='{' 3830 ) 3831 self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{') 3832 self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{') 3833 self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{') 3834 self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{') 3835 # Testing failure for mismatch braces 3836 self.assert_error_message( 3837 ValueError, 3838 "invalid format: unmatched '{' in format spec", 3839 logging.Formatter, '{process', style='{' 3840 ) 3841 self.assert_error_message( 3842 ValueError, 3843 "invalid format: unmatched '{' in format spec", 3844 logging.Formatter, 'process}', style='{' 3845 ) 3846 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{') 3847 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{') 3848 self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{') 3849 self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{') 3850 self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{') 3851 self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{') 3852 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{') 3853 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{') 3854 self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{') 3855 3856 # Dollar style 3857 # Testing failure for mismatch bare $ 3858 self.assert_error_message( 3859 ValueError, 3860 "invalid format: bare \'$\' not allowed", 3861 logging.Formatter, '$bar $$$', style='$' 3862 ) 3863 self.assert_error_message( 3864 ValueError, 3865 "invalid format: bare \'$\' not allowed", 3866 logging.Formatter, 'bar $', style='$' 3867 ) 3868 self.assert_error_message( 3869 ValueError, 3870 "invalid format: bare \'$\' not allowed", 3871 logging.Formatter, 'foo $.', style='$' 3872 ) 3873 # Testing failure for mismatch style 3874 self.assert_error_message( 3875 ValueError, 3876 "invalid format: no fields", 3877 logging.Formatter, '{asctime}', style='$' 3878 ) 3879 self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$') 3880 3881 # Testing failure for incorrect fields 3882 self.assert_error_message( 3883 ValueError, 3884 "invalid format: no fields", 3885 logging.Formatter, 'foo', style='$' 3886 ) 3887 self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$') 3888 3889 def test_invalid_style(self): 3890 self.assertRaises(ValueError, logging.Formatter, None, None, 'x') 3891 3892 def test_time(self): 3893 r = self.get_record() 3894 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc) 3895 # We use None to indicate we want the local timezone 3896 # We're essentially converting a UTC time to local time 3897 r.created = time.mktime(dt.astimezone(None).timetuple()) 3898 r.msecs = 123 3899 f = logging.Formatter('%(asctime)s %(message)s') 3900 f.converter = time.gmtime 3901 self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123') 3902 self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21') 3903 f.format(r) 3904 self.assertEqual(r.asctime, '1993-04-21 08:03:00,123') 3905 3906class TestBufferingFormatter(logging.BufferingFormatter): 3907 def formatHeader(self, records): 3908 return '[(%d)' % len(records) 3909 3910 def formatFooter(self, records): 3911 return '(%d)]' % len(records) 3912 3913class BufferingFormatterTest(unittest.TestCase): 3914 def setUp(self): 3915 self.records = [ 3916 logging.makeLogRecord({'msg': 'one'}), 3917 logging.makeLogRecord({'msg': 'two'}), 3918 ] 3919 3920 def test_default(self): 3921 f = logging.BufferingFormatter() 3922 self.assertEqual('', f.format([])) 3923 self.assertEqual('onetwo', f.format(self.records)) 3924 3925 def test_custom(self): 3926 f = TestBufferingFormatter() 3927 self.assertEqual('[(2)onetwo(2)]', f.format(self.records)) 3928 lf = logging.Formatter('<%(message)s>') 3929 f = TestBufferingFormatter(lf) 3930 self.assertEqual('[(2)<one><two>(2)]', f.format(self.records)) 3931 3932class ExceptionTest(BaseTest): 3933 def test_formatting(self): 3934 r = self.root_logger 3935 h = RecordingHandler() 3936 r.addHandler(h) 3937 try: 3938 raise RuntimeError('deliberate mistake') 3939 except: 3940 logging.exception('failed', stack_info=True) 3941 r.removeHandler(h) 3942 h.close() 3943 r = h.records[0] 3944 self.assertTrue(r.exc_text.startswith('Traceback (most recent ' 3945 'call last):\n')) 3946 self.assertTrue(r.exc_text.endswith('\nRuntimeError: ' 3947 'deliberate mistake')) 3948 self.assertTrue(r.stack_info.startswith('Stack (most recent ' 3949 'call last):\n')) 3950 self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', ' 3951 'stack_info=True)')) 3952 3953 3954class LastResortTest(BaseTest): 3955 def test_last_resort(self): 3956 # Test the last resort handler 3957 root = self.root_logger 3958 root.removeHandler(self.root_hdlr) 3959 old_lastresort = logging.lastResort 3960 old_raise_exceptions = logging.raiseExceptions 3961 3962 try: 3963 with support.captured_stderr() as stderr: 3964 root.debug('This should not appear') 3965 self.assertEqual(stderr.getvalue(), '') 3966 root.warning('Final chance!') 3967 self.assertEqual(stderr.getvalue(), 'Final chance!\n') 3968 3969 # No handlers and no last resort, so 'No handlers' message 3970 logging.lastResort = None 3971 with support.captured_stderr() as stderr: 3972 root.warning('Final chance!') 3973 msg = 'No handlers could be found for logger "root"\n' 3974 self.assertEqual(stderr.getvalue(), msg) 3975 3976 # 'No handlers' message only printed once 3977 with support.captured_stderr() as stderr: 3978 root.warning('Final chance!') 3979 self.assertEqual(stderr.getvalue(), '') 3980 3981 # If raiseExceptions is False, no message is printed 3982 root.manager.emittedNoHandlerWarning = False 3983 logging.raiseExceptions = False 3984 with support.captured_stderr() as stderr: 3985 root.warning('Final chance!') 3986 self.assertEqual(stderr.getvalue(), '') 3987 finally: 3988 root.addHandler(self.root_hdlr) 3989 logging.lastResort = old_lastresort 3990 logging.raiseExceptions = old_raise_exceptions 3991 3992 3993class FakeHandler: 3994 3995 def __init__(self, identifier, called): 3996 for method in ('acquire', 'flush', 'close', 'release'): 3997 setattr(self, method, self.record_call(identifier, method, called)) 3998 3999 def record_call(self, identifier, method_name, called): 4000 def inner(): 4001 called.append('{} - {}'.format(identifier, method_name)) 4002 return inner 4003 4004 4005class RecordingHandler(logging.NullHandler): 4006 4007 def __init__(self, *args, **kwargs): 4008 super(RecordingHandler, self).__init__(*args, **kwargs) 4009 self.records = [] 4010 4011 def handle(self, record): 4012 """Keep track of all the emitted records.""" 4013 self.records.append(record) 4014 4015 4016class ShutdownTest(BaseTest): 4017 4018 """Test suite for the shutdown method.""" 4019 4020 def setUp(self): 4021 super(ShutdownTest, self).setUp() 4022 self.called = [] 4023 4024 raise_exceptions = logging.raiseExceptions 4025 self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions) 4026 4027 def raise_error(self, error): 4028 def inner(): 4029 raise error() 4030 return inner 4031 4032 def test_no_failure(self): 4033 # create some fake handlers 4034 handler0 = FakeHandler(0, self.called) 4035 handler1 = FakeHandler(1, self.called) 4036 handler2 = FakeHandler(2, self.called) 4037 4038 # create live weakref to those handlers 4039 handlers = map(logging.weakref.ref, [handler0, handler1, handler2]) 4040 4041 logging.shutdown(handlerList=list(handlers)) 4042 4043 expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release', 4044 '1 - acquire', '1 - flush', '1 - close', '1 - release', 4045 '0 - acquire', '0 - flush', '0 - close', '0 - release'] 4046 self.assertEqual(expected, self.called) 4047 4048 def _test_with_failure_in_method(self, method, error): 4049 handler = FakeHandler(0, self.called) 4050 setattr(handler, method, self.raise_error(error)) 4051 handlers = [logging.weakref.ref(handler)] 4052 4053 logging.shutdown(handlerList=list(handlers)) 4054 4055 self.assertEqual('0 - release', self.called[-1]) 4056 4057 def test_with_ioerror_in_acquire(self): 4058 self._test_with_failure_in_method('acquire', OSError) 4059 4060 def test_with_ioerror_in_flush(self): 4061 self._test_with_failure_in_method('flush', OSError) 4062 4063 def test_with_ioerror_in_close(self): 4064 self._test_with_failure_in_method('close', OSError) 4065 4066 def test_with_valueerror_in_acquire(self): 4067 self._test_with_failure_in_method('acquire', ValueError) 4068 4069 def test_with_valueerror_in_flush(self): 4070 self._test_with_failure_in_method('flush', ValueError) 4071 4072 def test_with_valueerror_in_close(self): 4073 self._test_with_failure_in_method('close', ValueError) 4074 4075 def test_with_other_error_in_acquire_without_raise(self): 4076 logging.raiseExceptions = False 4077 self._test_with_failure_in_method('acquire', IndexError) 4078 4079 def test_with_other_error_in_flush_without_raise(self): 4080 logging.raiseExceptions = False 4081 self._test_with_failure_in_method('flush', IndexError) 4082 4083 def test_with_other_error_in_close_without_raise(self): 4084 logging.raiseExceptions = False 4085 self._test_with_failure_in_method('close', IndexError) 4086 4087 def test_with_other_error_in_acquire_with_raise(self): 4088 logging.raiseExceptions = True 4089 self.assertRaises(IndexError, self._test_with_failure_in_method, 4090 'acquire', IndexError) 4091 4092 def test_with_other_error_in_flush_with_raise(self): 4093 logging.raiseExceptions = True 4094 self.assertRaises(IndexError, self._test_with_failure_in_method, 4095 'flush', IndexError) 4096 4097 def test_with_other_error_in_close_with_raise(self): 4098 logging.raiseExceptions = True 4099 self.assertRaises(IndexError, self._test_with_failure_in_method, 4100 'close', IndexError) 4101 4102 4103class ModuleLevelMiscTest(BaseTest): 4104 4105 """Test suite for some module level methods.""" 4106 4107 def test_disable(self): 4108 old_disable = logging.root.manager.disable 4109 # confirm our assumptions are correct 4110 self.assertEqual(old_disable, 0) 4111 self.addCleanup(logging.disable, old_disable) 4112 4113 logging.disable(83) 4114 self.assertEqual(logging.root.manager.disable, 83) 4115 4116 # test the default value introduced in 3.7 4117 # (Issue #28524) 4118 logging.disable() 4119 self.assertEqual(logging.root.manager.disable, logging.CRITICAL) 4120 4121 def _test_log(self, method, level=None): 4122 called = [] 4123 support.patch(self, logging, 'basicConfig', 4124 lambda *a, **kw: called.append((a, kw))) 4125 4126 recording = RecordingHandler() 4127 logging.root.addHandler(recording) 4128 4129 log_method = getattr(logging, method) 4130 if level is not None: 4131 log_method(level, "test me: %r", recording) 4132 else: 4133 log_method("test me: %r", recording) 4134 4135 self.assertEqual(len(recording.records), 1) 4136 record = recording.records[0] 4137 self.assertEqual(record.getMessage(), "test me: %r" % recording) 4138 4139 expected_level = level if level is not None else getattr(logging, method.upper()) 4140 self.assertEqual(record.levelno, expected_level) 4141 4142 # basicConfig was not called! 4143 self.assertEqual(called, []) 4144 4145 def test_log(self): 4146 self._test_log('log', logging.ERROR) 4147 4148 def test_debug(self): 4149 self._test_log('debug') 4150 4151 def test_info(self): 4152 self._test_log('info') 4153 4154 def test_warning(self): 4155 self._test_log('warning') 4156 4157 def test_error(self): 4158 self._test_log('error') 4159 4160 def test_critical(self): 4161 self._test_log('critical') 4162 4163 def test_set_logger_class(self): 4164 self.assertRaises(TypeError, logging.setLoggerClass, object) 4165 4166 class MyLogger(logging.Logger): 4167 pass 4168 4169 logging.setLoggerClass(MyLogger) 4170 self.assertEqual(logging.getLoggerClass(), MyLogger) 4171 4172 logging.setLoggerClass(logging.Logger) 4173 self.assertEqual(logging.getLoggerClass(), logging.Logger) 4174 4175 def test_subclass_logger_cache(self): 4176 # bpo-37258 4177 message = [] 4178 4179 class MyLogger(logging.getLoggerClass()): 4180 def __init__(self, name='MyLogger', level=logging.NOTSET): 4181 super().__init__(name, level) 4182 message.append('initialized') 4183 4184 logging.setLoggerClass(MyLogger) 4185 logger = logging.getLogger('just_some_logger') 4186 self.assertEqual(message, ['initialized']) 4187 stream = io.StringIO() 4188 h = logging.StreamHandler(stream) 4189 logger.addHandler(h) 4190 try: 4191 logger.setLevel(logging.DEBUG) 4192 logger.debug("hello") 4193 self.assertEqual(stream.getvalue().strip(), "hello") 4194 4195 stream.truncate(0) 4196 stream.seek(0) 4197 4198 logger.setLevel(logging.INFO) 4199 logger.debug("hello") 4200 self.assertEqual(stream.getvalue(), "") 4201 finally: 4202 logger.removeHandler(h) 4203 h.close() 4204 logging.setLoggerClass(logging.Logger) 4205 4206 @support.requires_type_collecting 4207 def test_logging_at_shutdown(self): 4208 # Issue #20037 4209 code = """if 1: 4210 import logging 4211 4212 class A: 4213 def __del__(self): 4214 try: 4215 raise ValueError("some error") 4216 except Exception: 4217 logging.exception("exception in __del__") 4218 4219 a = A()""" 4220 rc, out, err = assert_python_ok("-c", code) 4221 err = err.decode() 4222 self.assertIn("exception in __del__", err) 4223 self.assertIn("ValueError: some error", err) 4224 4225 def test_recursion_error(self): 4226 # Issue 36272 4227 code = """if 1: 4228 import logging 4229 4230 def rec(): 4231 logging.error("foo") 4232 rec() 4233 4234 rec()""" 4235 rc, out, err = assert_python_failure("-c", code) 4236 err = err.decode() 4237 self.assertNotIn("Cannot recover from stack overflow.", err) 4238 self.assertEqual(rc, 1) 4239 4240 4241class LogRecordTest(BaseTest): 4242 def test_str_rep(self): 4243 r = logging.makeLogRecord({}) 4244 s = str(r) 4245 self.assertTrue(s.startswith('<LogRecord: ')) 4246 self.assertTrue(s.endswith('>')) 4247 4248 def test_dict_arg(self): 4249 h = RecordingHandler() 4250 r = logging.getLogger() 4251 r.addHandler(h) 4252 d = {'less' : 'more' } 4253 logging.warning('less is %(less)s', d) 4254 self.assertIs(h.records[0].args, d) 4255 self.assertEqual(h.records[0].message, 'less is more') 4256 r.removeHandler(h) 4257 h.close() 4258 4259 def test_multiprocessing(self): 4260 r = logging.makeLogRecord({}) 4261 self.assertEqual(r.processName, 'MainProcess') 4262 try: 4263 import multiprocessing as mp 4264 r = logging.makeLogRecord({}) 4265 self.assertEqual(r.processName, mp.current_process().name) 4266 except ImportError: 4267 pass 4268 4269 def test_optional(self): 4270 r = logging.makeLogRecord({}) 4271 NOT_NONE = self.assertIsNotNone 4272 NOT_NONE(r.thread) 4273 NOT_NONE(r.threadName) 4274 NOT_NONE(r.process) 4275 NOT_NONE(r.processName) 4276 log_threads = logging.logThreads 4277 log_processes = logging.logProcesses 4278 log_multiprocessing = logging.logMultiprocessing 4279 try: 4280 logging.logThreads = False 4281 logging.logProcesses = False 4282 logging.logMultiprocessing = False 4283 r = logging.makeLogRecord({}) 4284 NONE = self.assertIsNone 4285 NONE(r.thread) 4286 NONE(r.threadName) 4287 NONE(r.process) 4288 NONE(r.processName) 4289 finally: 4290 logging.logThreads = log_threads 4291 logging.logProcesses = log_processes 4292 logging.logMultiprocessing = log_multiprocessing 4293 4294class BasicConfigTest(unittest.TestCase): 4295 4296 """Test suite for logging.basicConfig.""" 4297 4298 def setUp(self): 4299 super(BasicConfigTest, self).setUp() 4300 self.handlers = logging.root.handlers 4301 self.saved_handlers = logging._handlers.copy() 4302 self.saved_handler_list = logging._handlerList[:] 4303 self.original_logging_level = logging.root.level 4304 self.addCleanup(self.cleanup) 4305 logging.root.handlers = [] 4306 4307 def tearDown(self): 4308 for h in logging.root.handlers[:]: 4309 logging.root.removeHandler(h) 4310 h.close() 4311 super(BasicConfigTest, self).tearDown() 4312 4313 def cleanup(self): 4314 setattr(logging.root, 'handlers', self.handlers) 4315 logging._handlers.clear() 4316 logging._handlers.update(self.saved_handlers) 4317 logging._handlerList[:] = self.saved_handler_list 4318 logging.root.setLevel(self.original_logging_level) 4319 4320 def test_no_kwargs(self): 4321 logging.basicConfig() 4322 4323 # handler defaults to a StreamHandler to sys.stderr 4324 self.assertEqual(len(logging.root.handlers), 1) 4325 handler = logging.root.handlers[0] 4326 self.assertIsInstance(handler, logging.StreamHandler) 4327 self.assertEqual(handler.stream, sys.stderr) 4328 4329 formatter = handler.formatter 4330 # format defaults to logging.BASIC_FORMAT 4331 self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT) 4332 # datefmt defaults to None 4333 self.assertIsNone(formatter.datefmt) 4334 # style defaults to % 4335 self.assertIsInstance(formatter._style, logging.PercentStyle) 4336 4337 # level is not explicitly set 4338 self.assertEqual(logging.root.level, self.original_logging_level) 4339 4340 def test_strformatstyle(self): 4341 with support.captured_stdout() as output: 4342 logging.basicConfig(stream=sys.stdout, style="{") 4343 logging.error("Log an error") 4344 sys.stdout.seek(0) 4345 self.assertEqual(output.getvalue().strip(), 4346 "ERROR:root:Log an error") 4347 4348 def test_stringtemplatestyle(self): 4349 with support.captured_stdout() as output: 4350 logging.basicConfig(stream=sys.stdout, style="$") 4351 logging.error("Log an error") 4352 sys.stdout.seek(0) 4353 self.assertEqual(output.getvalue().strip(), 4354 "ERROR:root:Log an error") 4355 4356 def test_filename(self): 4357 4358 def cleanup(h1, h2, fn): 4359 h1.close() 4360 h2.close() 4361 os.remove(fn) 4362 4363 logging.basicConfig(filename='test.log') 4364 4365 self.assertEqual(len(logging.root.handlers), 1) 4366 handler = logging.root.handlers[0] 4367 self.assertIsInstance(handler, logging.FileHandler) 4368 4369 expected = logging.FileHandler('test.log', 'a') 4370 self.assertEqual(handler.stream.mode, expected.stream.mode) 4371 self.assertEqual(handler.stream.name, expected.stream.name) 4372 self.addCleanup(cleanup, handler, expected, 'test.log') 4373 4374 def test_filemode(self): 4375 4376 def cleanup(h1, h2, fn): 4377 h1.close() 4378 h2.close() 4379 os.remove(fn) 4380 4381 logging.basicConfig(filename='test.log', filemode='wb') 4382 4383 handler = logging.root.handlers[0] 4384 expected = logging.FileHandler('test.log', 'wb') 4385 self.assertEqual(handler.stream.mode, expected.stream.mode) 4386 self.addCleanup(cleanup, handler, expected, 'test.log') 4387 4388 def test_stream(self): 4389 stream = io.StringIO() 4390 self.addCleanup(stream.close) 4391 logging.basicConfig(stream=stream) 4392 4393 self.assertEqual(len(logging.root.handlers), 1) 4394 handler = logging.root.handlers[0] 4395 self.assertIsInstance(handler, logging.StreamHandler) 4396 self.assertEqual(handler.stream, stream) 4397 4398 def test_format(self): 4399 logging.basicConfig(format='%(asctime)s - %(message)s') 4400 4401 formatter = logging.root.handlers[0].formatter 4402 self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s') 4403 4404 def test_datefmt(self): 4405 logging.basicConfig(datefmt='bar') 4406 4407 formatter = logging.root.handlers[0].formatter 4408 self.assertEqual(formatter.datefmt, 'bar') 4409 4410 def test_style(self): 4411 logging.basicConfig(style='$') 4412 4413 formatter = logging.root.handlers[0].formatter 4414 self.assertIsInstance(formatter._style, logging.StringTemplateStyle) 4415 4416 def test_level(self): 4417 old_level = logging.root.level 4418 self.addCleanup(logging.root.setLevel, old_level) 4419 4420 logging.basicConfig(level=57) 4421 self.assertEqual(logging.root.level, 57) 4422 # Test that second call has no effect 4423 logging.basicConfig(level=58) 4424 self.assertEqual(logging.root.level, 57) 4425 4426 def test_incompatible(self): 4427 assertRaises = self.assertRaises 4428 handlers = [logging.StreamHandler()] 4429 stream = sys.stderr 4430 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4431 stream=stream) 4432 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4433 handlers=handlers) 4434 assertRaises(ValueError, logging.basicConfig, stream=stream, 4435 handlers=handlers) 4436 # Issue 23207: test for invalid kwargs 4437 assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) 4438 # Should pop both filename and filemode even if filename is None 4439 logging.basicConfig(filename=None, filemode='a') 4440 4441 def test_handlers(self): 4442 handlers = [ 4443 logging.StreamHandler(), 4444 logging.StreamHandler(sys.stdout), 4445 logging.StreamHandler(), 4446 ] 4447 f = logging.Formatter() 4448 handlers[2].setFormatter(f) 4449 logging.basicConfig(handlers=handlers) 4450 self.assertIs(handlers[0], logging.root.handlers[0]) 4451 self.assertIs(handlers[1], logging.root.handlers[1]) 4452 self.assertIs(handlers[2], logging.root.handlers[2]) 4453 self.assertIsNotNone(handlers[0].formatter) 4454 self.assertIsNotNone(handlers[1].formatter) 4455 self.assertIs(handlers[2].formatter, f) 4456 self.assertIs(handlers[0].formatter, handlers[1].formatter) 4457 4458 def test_force(self): 4459 old_string_io = io.StringIO() 4460 new_string_io = io.StringIO() 4461 old_handlers = [logging.StreamHandler(old_string_io)] 4462 new_handlers = [logging.StreamHandler(new_string_io)] 4463 logging.basicConfig(level=logging.WARNING, handlers=old_handlers) 4464 logging.warning('warn') 4465 logging.info('info') 4466 logging.debug('debug') 4467 self.assertEqual(len(logging.root.handlers), 1) 4468 logging.basicConfig(level=logging.INFO, handlers=new_handlers, 4469 force=True) 4470 logging.warning('warn') 4471 logging.info('info') 4472 logging.debug('debug') 4473 self.assertEqual(len(logging.root.handlers), 1) 4474 self.assertEqual(old_string_io.getvalue().strip(), 4475 'WARNING:root:warn') 4476 self.assertEqual(new_string_io.getvalue().strip(), 4477 'WARNING:root:warn\nINFO:root:info') 4478 4479 def _test_log(self, method, level=None): 4480 # logging.root has no handlers so basicConfig should be called 4481 called = [] 4482 4483 old_basic_config = logging.basicConfig 4484 def my_basic_config(*a, **kw): 4485 old_basic_config() 4486 old_level = logging.root.level 4487 logging.root.setLevel(100) # avoid having messages in stderr 4488 self.addCleanup(logging.root.setLevel, old_level) 4489 called.append((a, kw)) 4490 4491 support.patch(self, logging, 'basicConfig', my_basic_config) 4492 4493 log_method = getattr(logging, method) 4494 if level is not None: 4495 log_method(level, "test me") 4496 else: 4497 log_method("test me") 4498 4499 # basicConfig was called with no arguments 4500 self.assertEqual(called, [((), {})]) 4501 4502 def test_log(self): 4503 self._test_log('log', logging.WARNING) 4504 4505 def test_debug(self): 4506 self._test_log('debug') 4507 4508 def test_info(self): 4509 self._test_log('info') 4510 4511 def test_warning(self): 4512 self._test_log('warning') 4513 4514 def test_error(self): 4515 self._test_log('error') 4516 4517 def test_critical(self): 4518 self._test_log('critical') 4519 4520 4521class LoggerAdapterTest(unittest.TestCase): 4522 def setUp(self): 4523 super(LoggerAdapterTest, self).setUp() 4524 old_handler_list = logging._handlerList[:] 4525 4526 self.recording = RecordingHandler() 4527 self.logger = logging.root 4528 self.logger.addHandler(self.recording) 4529 self.addCleanup(self.logger.removeHandler, self.recording) 4530 self.addCleanup(self.recording.close) 4531 4532 def cleanup(): 4533 logging._handlerList[:] = old_handler_list 4534 4535 self.addCleanup(cleanup) 4536 self.addCleanup(logging.shutdown) 4537 self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None) 4538 4539 def test_exception(self): 4540 msg = 'testing exception: %r' 4541 exc = None 4542 try: 4543 1 / 0 4544 except ZeroDivisionError as e: 4545 exc = e 4546 self.adapter.exception(msg, self.recording) 4547 4548 self.assertEqual(len(self.recording.records), 1) 4549 record = self.recording.records[0] 4550 self.assertEqual(record.levelno, logging.ERROR) 4551 self.assertEqual(record.msg, msg) 4552 self.assertEqual(record.args, (self.recording,)) 4553 self.assertEqual(record.exc_info, 4554 (exc.__class__, exc, exc.__traceback__)) 4555 4556 def test_exception_excinfo(self): 4557 try: 4558 1 / 0 4559 except ZeroDivisionError as e: 4560 exc = e 4561 4562 self.adapter.exception('exc_info test', exc_info=exc) 4563 4564 self.assertEqual(len(self.recording.records), 1) 4565 record = self.recording.records[0] 4566 self.assertEqual(record.exc_info, 4567 (exc.__class__, exc, exc.__traceback__)) 4568 4569 def test_critical(self): 4570 msg = 'critical test! %r' 4571 self.adapter.critical(msg, self.recording) 4572 4573 self.assertEqual(len(self.recording.records), 1) 4574 record = self.recording.records[0] 4575 self.assertEqual(record.levelno, logging.CRITICAL) 4576 self.assertEqual(record.msg, msg) 4577 self.assertEqual(record.args, (self.recording,)) 4578 4579 def test_is_enabled_for(self): 4580 old_disable = self.adapter.logger.manager.disable 4581 self.adapter.logger.manager.disable = 33 4582 self.addCleanup(setattr, self.adapter.logger.manager, 'disable', 4583 old_disable) 4584 self.assertFalse(self.adapter.isEnabledFor(32)) 4585 4586 def test_has_handlers(self): 4587 self.assertTrue(self.adapter.hasHandlers()) 4588 4589 for handler in self.logger.handlers: 4590 self.logger.removeHandler(handler) 4591 4592 self.assertFalse(self.logger.hasHandlers()) 4593 self.assertFalse(self.adapter.hasHandlers()) 4594 4595 def test_nested(self): 4596 class Adapter(logging.LoggerAdapter): 4597 prefix = 'Adapter' 4598 4599 def process(self, msg, kwargs): 4600 return f"{self.prefix} {msg}", kwargs 4601 4602 msg = 'Adapters can be nested, yo.' 4603 adapter = Adapter(logger=self.logger, extra=None) 4604 adapter_adapter = Adapter(logger=adapter, extra=None) 4605 adapter_adapter.prefix = 'AdapterAdapter' 4606 self.assertEqual(repr(adapter), repr(adapter_adapter)) 4607 adapter_adapter.log(logging.CRITICAL, msg, self.recording) 4608 self.assertEqual(len(self.recording.records), 1) 4609 record = self.recording.records[0] 4610 self.assertEqual(record.levelno, logging.CRITICAL) 4611 self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}") 4612 self.assertEqual(record.args, (self.recording,)) 4613 orig_manager = adapter_adapter.manager 4614 self.assertIs(adapter.manager, orig_manager) 4615 self.assertIs(self.logger.manager, orig_manager) 4616 temp_manager = object() 4617 try: 4618 adapter_adapter.manager = temp_manager 4619 self.assertIs(adapter_adapter.manager, temp_manager) 4620 self.assertIs(adapter.manager, temp_manager) 4621 self.assertIs(self.logger.manager, temp_manager) 4622 finally: 4623 adapter_adapter.manager = orig_manager 4624 self.assertIs(adapter_adapter.manager, orig_manager) 4625 self.assertIs(adapter.manager, orig_manager) 4626 self.assertIs(self.logger.manager, orig_manager) 4627 4628 4629class LoggerTest(BaseTest): 4630 4631 def setUp(self): 4632 super(LoggerTest, self).setUp() 4633 self.recording = RecordingHandler() 4634 self.logger = logging.Logger(name='blah') 4635 self.logger.addHandler(self.recording) 4636 self.addCleanup(self.logger.removeHandler, self.recording) 4637 self.addCleanup(self.recording.close) 4638 self.addCleanup(logging.shutdown) 4639 4640 def test_set_invalid_level(self): 4641 self.assertRaises(TypeError, self.logger.setLevel, object()) 4642 4643 def test_exception(self): 4644 msg = 'testing exception: %r' 4645 exc = None 4646 try: 4647 1 / 0 4648 except ZeroDivisionError as e: 4649 exc = e 4650 self.logger.exception(msg, self.recording) 4651 4652 self.assertEqual(len(self.recording.records), 1) 4653 record = self.recording.records[0] 4654 self.assertEqual(record.levelno, logging.ERROR) 4655 self.assertEqual(record.msg, msg) 4656 self.assertEqual(record.args, (self.recording,)) 4657 self.assertEqual(record.exc_info, 4658 (exc.__class__, exc, exc.__traceback__)) 4659 4660 def test_log_invalid_level_with_raise(self): 4661 with support.swap_attr(logging, 'raiseExceptions', True): 4662 self.assertRaises(TypeError, self.logger.log, '10', 'test message') 4663 4664 def test_log_invalid_level_no_raise(self): 4665 with support.swap_attr(logging, 'raiseExceptions', False): 4666 self.logger.log('10', 'test message') # no exception happens 4667 4668 def test_find_caller_with_stack_info(self): 4669 called = [] 4670 support.patch(self, logging.traceback, 'print_stack', 4671 lambda f, file: called.append(file.getvalue())) 4672 4673 self.logger.findCaller(stack_info=True) 4674 4675 self.assertEqual(len(called), 1) 4676 self.assertEqual('Stack (most recent call last):\n', called[0]) 4677 4678 def test_find_caller_with_stacklevel(self): 4679 the_level = 1 4680 4681 def innermost(): 4682 self.logger.warning('test', stacklevel=the_level) 4683 4684 def inner(): 4685 innermost() 4686 4687 def outer(): 4688 inner() 4689 4690 records = self.recording.records 4691 outer() 4692 self.assertEqual(records[-1].funcName, 'innermost') 4693 lineno = records[-1].lineno 4694 the_level += 1 4695 outer() 4696 self.assertEqual(records[-1].funcName, 'inner') 4697 self.assertGreater(records[-1].lineno, lineno) 4698 lineno = records[-1].lineno 4699 the_level += 1 4700 outer() 4701 self.assertEqual(records[-1].funcName, 'outer') 4702 self.assertGreater(records[-1].lineno, lineno) 4703 lineno = records[-1].lineno 4704 the_level += 1 4705 outer() 4706 self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel') 4707 self.assertGreater(records[-1].lineno, lineno) 4708 4709 def test_make_record_with_extra_overwrite(self): 4710 name = 'my record' 4711 level = 13 4712 fn = lno = msg = args = exc_info = func = sinfo = None 4713 rv = logging._logRecordFactory(name, level, fn, lno, msg, args, 4714 exc_info, func, sinfo) 4715 4716 for key in ('message', 'asctime') + tuple(rv.__dict__.keys()): 4717 extra = {key: 'some value'} 4718 self.assertRaises(KeyError, self.logger.makeRecord, name, level, 4719 fn, lno, msg, args, exc_info, 4720 extra=extra, sinfo=sinfo) 4721 4722 def test_make_record_with_extra_no_overwrite(self): 4723 name = 'my record' 4724 level = 13 4725 fn = lno = msg = args = exc_info = func = sinfo = None 4726 extra = {'valid_key': 'some value'} 4727 result = self.logger.makeRecord(name, level, fn, lno, msg, args, 4728 exc_info, extra=extra, sinfo=sinfo) 4729 self.assertIn('valid_key', result.__dict__) 4730 4731 def test_has_handlers(self): 4732 self.assertTrue(self.logger.hasHandlers()) 4733 4734 for handler in self.logger.handlers: 4735 self.logger.removeHandler(handler) 4736 self.assertFalse(self.logger.hasHandlers()) 4737 4738 def test_has_handlers_no_propagate(self): 4739 child_logger = logging.getLogger('blah.child') 4740 child_logger.propagate = False 4741 self.assertFalse(child_logger.hasHandlers()) 4742 4743 def test_is_enabled_for(self): 4744 old_disable = self.logger.manager.disable 4745 self.logger.manager.disable = 23 4746 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 4747 self.assertFalse(self.logger.isEnabledFor(22)) 4748 4749 def test_is_enabled_for_disabled_logger(self): 4750 old_disabled = self.logger.disabled 4751 old_disable = self.logger.manager.disable 4752 4753 self.logger.disabled = True 4754 self.logger.manager.disable = 21 4755 4756 self.addCleanup(setattr, self.logger, 'disabled', old_disabled) 4757 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 4758 4759 self.assertFalse(self.logger.isEnabledFor(22)) 4760 4761 def test_root_logger_aliases(self): 4762 root = logging.getLogger() 4763 self.assertIs(root, logging.root) 4764 self.assertIs(root, logging.getLogger(None)) 4765 self.assertIs(root, logging.getLogger('')) 4766 self.assertIs(root, logging.getLogger('foo').root) 4767 self.assertIs(root, logging.getLogger('foo.bar').root) 4768 self.assertIs(root, logging.getLogger('foo').parent) 4769 4770 self.assertIsNot(root, logging.getLogger('\0')) 4771 self.assertIsNot(root, logging.getLogger('foo.bar').parent) 4772 4773 def test_invalid_names(self): 4774 self.assertRaises(TypeError, logging.getLogger, any) 4775 self.assertRaises(TypeError, logging.getLogger, b'foo') 4776 4777 def test_pickling(self): 4778 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4779 for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'): 4780 logger = logging.getLogger(name) 4781 s = pickle.dumps(logger, proto) 4782 unpickled = pickle.loads(s) 4783 self.assertIs(unpickled, logger) 4784 4785 def test_caching(self): 4786 root = self.root_logger 4787 logger1 = logging.getLogger("abc") 4788 logger2 = logging.getLogger("abc.def") 4789 4790 # Set root logger level and ensure cache is empty 4791 root.setLevel(logging.ERROR) 4792 self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR) 4793 self.assertEqual(logger2._cache, {}) 4794 4795 # Ensure cache is populated and calls are consistent 4796 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 4797 self.assertFalse(logger2.isEnabledFor(logging.DEBUG)) 4798 self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False}) 4799 self.assertEqual(root._cache, {}) 4800 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 4801 4802 # Ensure root cache gets populated 4803 self.assertEqual(root._cache, {}) 4804 self.assertTrue(root.isEnabledFor(logging.ERROR)) 4805 self.assertEqual(root._cache, {logging.ERROR: True}) 4806 4807 # Set parent logger level and ensure caches are emptied 4808 logger1.setLevel(logging.CRITICAL) 4809 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 4810 self.assertEqual(logger2._cache, {}) 4811 4812 # Ensure logger2 uses parent logger's effective level 4813 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 4814 4815 # Set level to NOTSET and ensure caches are empty 4816 logger2.setLevel(logging.NOTSET) 4817 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 4818 self.assertEqual(logger2._cache, {}) 4819 self.assertEqual(logger1._cache, {}) 4820 self.assertEqual(root._cache, {}) 4821 4822 # Verify logger2 follows parent and not root 4823 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 4824 self.assertTrue(logger2.isEnabledFor(logging.CRITICAL)) 4825 self.assertFalse(logger1.isEnabledFor(logging.ERROR)) 4826 self.assertTrue(logger1.isEnabledFor(logging.CRITICAL)) 4827 self.assertTrue(root.isEnabledFor(logging.ERROR)) 4828 4829 # Disable logging in manager and ensure caches are clear 4830 logging.disable() 4831 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 4832 self.assertEqual(logger2._cache, {}) 4833 self.assertEqual(logger1._cache, {}) 4834 self.assertEqual(root._cache, {}) 4835 4836 # Ensure no loggers are enabled 4837 self.assertFalse(logger1.isEnabledFor(logging.CRITICAL)) 4838 self.assertFalse(logger2.isEnabledFor(logging.CRITICAL)) 4839 self.assertFalse(root.isEnabledFor(logging.CRITICAL)) 4840 4841 4842class BaseFileTest(BaseTest): 4843 "Base class for handler tests that write log files" 4844 4845 def setUp(self): 4846 BaseTest.setUp(self) 4847 fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") 4848 os.close(fd) 4849 self.rmfiles = [] 4850 4851 def tearDown(self): 4852 for fn in self.rmfiles: 4853 os.unlink(fn) 4854 if os.path.exists(self.fn): 4855 os.unlink(self.fn) 4856 BaseTest.tearDown(self) 4857 4858 def assertLogFile(self, filename): 4859 "Assert a log file is there and register it for deletion" 4860 self.assertTrue(os.path.exists(filename), 4861 msg="Log file %r does not exist" % filename) 4862 self.rmfiles.append(filename) 4863 4864 4865class FileHandlerTest(BaseFileTest): 4866 def test_delay(self): 4867 os.unlink(self.fn) 4868 fh = logging.FileHandler(self.fn, delay=True) 4869 self.assertIsNone(fh.stream) 4870 self.assertFalse(os.path.exists(self.fn)) 4871 fh.handle(logging.makeLogRecord({})) 4872 self.assertIsNotNone(fh.stream) 4873 self.assertTrue(os.path.exists(self.fn)) 4874 fh.close() 4875 4876class RotatingFileHandlerTest(BaseFileTest): 4877 def next_rec(self): 4878 return logging.LogRecord('n', logging.DEBUG, 'p', 1, 4879 self.next_message(), None, None, None) 4880 4881 def test_should_not_rollover(self): 4882 # If maxbytes is zero rollover never occurs 4883 rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0) 4884 self.assertFalse(rh.shouldRollover(None)) 4885 rh.close() 4886 4887 def test_should_rollover(self): 4888 rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1) 4889 self.assertTrue(rh.shouldRollover(self.next_rec())) 4890 rh.close() 4891 4892 def test_file_created(self): 4893 # checks that the file is created and assumes it was created 4894 # by us 4895 rh = logging.handlers.RotatingFileHandler(self.fn) 4896 rh.emit(self.next_rec()) 4897 self.assertLogFile(self.fn) 4898 rh.close() 4899 4900 def test_rollover_filenames(self): 4901 def namer(name): 4902 return name + ".test" 4903 rh = logging.handlers.RotatingFileHandler( 4904 self.fn, backupCount=2, maxBytes=1) 4905 rh.namer = namer 4906 rh.emit(self.next_rec()) 4907 self.assertLogFile(self.fn) 4908 rh.emit(self.next_rec()) 4909 self.assertLogFile(namer(self.fn + ".1")) 4910 rh.emit(self.next_rec()) 4911 self.assertLogFile(namer(self.fn + ".2")) 4912 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 4913 rh.close() 4914 4915 @support.requires_zlib 4916 def test_rotator(self): 4917 def namer(name): 4918 return name + ".gz" 4919 4920 def rotator(source, dest): 4921 with open(source, "rb") as sf: 4922 data = sf.read() 4923 compressed = zlib.compress(data, 9) 4924 with open(dest, "wb") as df: 4925 df.write(compressed) 4926 os.remove(source) 4927 4928 rh = logging.handlers.RotatingFileHandler( 4929 self.fn, backupCount=2, maxBytes=1) 4930 rh.rotator = rotator 4931 rh.namer = namer 4932 m1 = self.next_rec() 4933 rh.emit(m1) 4934 self.assertLogFile(self.fn) 4935 m2 = self.next_rec() 4936 rh.emit(m2) 4937 fn = namer(self.fn + ".1") 4938 self.assertLogFile(fn) 4939 newline = os.linesep 4940 with open(fn, "rb") as f: 4941 compressed = f.read() 4942 data = zlib.decompress(compressed) 4943 self.assertEqual(data.decode("ascii"), m1.msg + newline) 4944 rh.emit(self.next_rec()) 4945 fn = namer(self.fn + ".2") 4946 self.assertLogFile(fn) 4947 with open(fn, "rb") as f: 4948 compressed = f.read() 4949 data = zlib.decompress(compressed) 4950 self.assertEqual(data.decode("ascii"), m1.msg + newline) 4951 rh.emit(self.next_rec()) 4952 fn = namer(self.fn + ".2") 4953 with open(fn, "rb") as f: 4954 compressed = f.read() 4955 data = zlib.decompress(compressed) 4956 self.assertEqual(data.decode("ascii"), m2.msg + newline) 4957 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 4958 rh.close() 4959 4960class TimedRotatingFileHandlerTest(BaseFileTest): 4961 # other test methods added below 4962 def test_rollover(self): 4963 fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S', 4964 backupCount=1) 4965 fmt = logging.Formatter('%(asctime)s %(message)s') 4966 fh.setFormatter(fmt) 4967 r1 = logging.makeLogRecord({'msg': 'testing - initial'}) 4968 fh.emit(r1) 4969 self.assertLogFile(self.fn) 4970 time.sleep(1.1) # a little over a second ... 4971 r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) 4972 fh.emit(r2) 4973 fh.close() 4974 # At this point, we should have a recent rotated file which we 4975 # can test for the existence of. However, in practice, on some 4976 # machines which run really slowly, we don't know how far back 4977 # in time to go to look for the log file. So, we go back a fair 4978 # bit, and stop as soon as we see a rotated file. In theory this 4979 # could of course still fail, but the chances are lower. 4980 found = False 4981 now = datetime.datetime.now() 4982 GO_BACK = 5 * 60 # seconds 4983 for secs in range(GO_BACK): 4984 prev = now - datetime.timedelta(seconds=secs) 4985 fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") 4986 found = os.path.exists(fn) 4987 if found: 4988 self.rmfiles.append(fn) 4989 break 4990 msg = 'No rotated files found, went back %d seconds' % GO_BACK 4991 if not found: 4992 # print additional diagnostics 4993 dn, fn = os.path.split(self.fn) 4994 files = [f for f in os.listdir(dn) if f.startswith(fn)] 4995 print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) 4996 print('The only matching files are: %s' % files, file=sys.stderr) 4997 for f in files: 4998 print('Contents of %s:' % f) 4999 path = os.path.join(dn, f) 5000 with open(path, 'r') as tf: 5001 print(tf.read()) 5002 self.assertTrue(found, msg=msg) 5003 5004 def test_invalid(self): 5005 assertRaises = self.assertRaises 5006 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5007 self.fn, 'X', delay=True) 5008 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5009 self.fn, 'W', delay=True) 5010 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5011 self.fn, 'W7', delay=True) 5012 5013 def test_compute_rollover_daily_attime(self): 5014 currentTime = 0 5015 atTime = datetime.time(12, 0, 0) 5016 rh = logging.handlers.TimedRotatingFileHandler( 5017 self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True, 5018 atTime=atTime) 5019 try: 5020 actual = rh.computeRollover(currentTime) 5021 self.assertEqual(actual, currentTime + 12 * 60 * 60) 5022 5023 actual = rh.computeRollover(currentTime + 13 * 60 * 60) 5024 self.assertEqual(actual, currentTime + 36 * 60 * 60) 5025 finally: 5026 rh.close() 5027 5028 #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.') 5029 def test_compute_rollover_weekly_attime(self): 5030 currentTime = int(time.time()) 5031 today = currentTime - currentTime % 86400 5032 5033 atTime = datetime.time(12, 0, 0) 5034 5035 wday = time.gmtime(today).tm_wday 5036 for day in range(7): 5037 rh = logging.handlers.TimedRotatingFileHandler( 5038 self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True, 5039 atTime=atTime) 5040 try: 5041 if wday > day: 5042 # The rollover day has already passed this week, so we 5043 # go over into next week 5044 expected = (7 - wday + day) 5045 else: 5046 expected = (day - wday) 5047 # At this point expected is in days from now, convert to seconds 5048 expected *= 24 * 60 * 60 5049 # Add in the rollover time 5050 expected += 12 * 60 * 60 5051 # Add in adjustment for today 5052 expected += today 5053 actual = rh.computeRollover(today) 5054 if actual != expected: 5055 print('failed in timezone: %d' % time.timezone) 5056 print('local vars: %s' % locals()) 5057 self.assertEqual(actual, expected) 5058 if day == wday: 5059 # goes into following week 5060 expected += 7 * 24 * 60 * 60 5061 actual = rh.computeRollover(today + 13 * 60 * 60) 5062 if actual != expected: 5063 print('failed in timezone: %d' % time.timezone) 5064 print('local vars: %s' % locals()) 5065 self.assertEqual(actual, expected) 5066 finally: 5067 rh.close() 5068 5069 5070def secs(**kw): 5071 return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) 5072 5073for when, exp in (('S', 1), 5074 ('M', 60), 5075 ('H', 60 * 60), 5076 ('D', 60 * 60 * 24), 5077 ('MIDNIGHT', 60 * 60 * 24), 5078 # current time (epoch start) is a Thursday, W0 means Monday 5079 ('W0', secs(days=4, hours=24)), 5080 ): 5081 def test_compute_rollover(self, when=when, exp=exp): 5082 rh = logging.handlers.TimedRotatingFileHandler( 5083 self.fn, when=when, interval=1, backupCount=0, utc=True) 5084 currentTime = 0.0 5085 actual = rh.computeRollover(currentTime) 5086 if exp != actual: 5087 # Failures occur on some systems for MIDNIGHT and W0. 5088 # Print detailed calculation for MIDNIGHT so we can try to see 5089 # what's going on 5090 if when == 'MIDNIGHT': 5091 try: 5092 if rh.utc: 5093 t = time.gmtime(currentTime) 5094 else: 5095 t = time.localtime(currentTime) 5096 currentHour = t[3] 5097 currentMinute = t[4] 5098 currentSecond = t[5] 5099 # r is the number of seconds left between now and midnight 5100 r = logging.handlers._MIDNIGHT - ((currentHour * 60 + 5101 currentMinute) * 60 + 5102 currentSecond) 5103 result = currentTime + r 5104 print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) 5105 print('currentHour: %s' % currentHour, file=sys.stderr) 5106 print('currentMinute: %s' % currentMinute, file=sys.stderr) 5107 print('currentSecond: %s' % currentSecond, file=sys.stderr) 5108 print('r: %s' % r, file=sys.stderr) 5109 print('result: %s' % result, file=sys.stderr) 5110 except Exception: 5111 print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr) 5112 self.assertEqual(exp, actual) 5113 rh.close() 5114 setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) 5115 5116 5117@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.') 5118class NTEventLogHandlerTest(BaseTest): 5119 def test_basic(self): 5120 logtype = 'Application' 5121 elh = win32evtlog.OpenEventLog(None, logtype) 5122 num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) 5123 5124 try: 5125 h = logging.handlers.NTEventLogHandler('test_logging') 5126 except pywintypes.error as e: 5127 if e.winerror == 5: # access denied 5128 raise unittest.SkipTest('Insufficient privileges to run test') 5129 raise 5130 5131 r = logging.makeLogRecord({'msg': 'Test Log Message'}) 5132 h.handle(r) 5133 h.close() 5134 # Now see if the event is recorded 5135 self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) 5136 flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ 5137 win32evtlog.EVENTLOG_SEQUENTIAL_READ 5138 found = False 5139 GO_BACK = 100 5140 events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) 5141 for e in events: 5142 if e.SourceName != 'test_logging': 5143 continue 5144 msg = win32evtlogutil.SafeFormatMessage(e, logtype) 5145 if msg != 'Test Log Message\r\n': 5146 continue 5147 found = True 5148 break 5149 msg = 'Record not found in event log, went back %d records' % GO_BACK 5150 self.assertTrue(found, msg=msg) 5151 5152 5153class MiscTestCase(unittest.TestCase): 5154 def test__all__(self): 5155 blacklist = {'logThreads', 'logMultiprocessing', 5156 'logProcesses', 'currentframe', 5157 'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle', 5158 'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 5159 'root', 'threading'} 5160 support.check__all__(self, logging, blacklist=blacklist) 5161 5162 5163# Set the locale to the platform-dependent default. I have no idea 5164# why the test does this, but in any case we save the current locale 5165# first and restore it at the end. 5166@support.run_with_locale('LC_ALL', '') 5167def test_main(): 5168 tests = [ 5169 BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest, 5170 HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest, 5171 DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest, 5172 ConfigDictTest, ManagerTest, FormatterTest, BufferingFormatterTest, 5173 StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest, 5174 QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest, 5175 LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest, 5176 RotatingFileHandlerTest, LastResortTest, LogRecordTest, 5177 ExceptionTest, SysLogHandlerTest, IPv6SysLogHandlerTest, HTTPHandlerTest, 5178 NTEventLogHandlerTest, TimedRotatingFileHandlerTest, 5179 UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest, 5180 MiscTestCase 5181 ] 5182 if hasattr(logging.handlers, 'QueueListener'): 5183 tests.append(QueueListenerTest) 5184 support.run_unittest(*tests) 5185 5186if __name__ == "__main__": 5187 test_main() 5188