1# Copyright 2001-2021 by Vinay Sajip. All Rights Reserved. 2# 3# Permission to use, copy, modify, and distribute this software and its 4# documentation for any purpose and without fee is hereby granted, 5# provided that the above copyright notice appear in all copies and that 6# both that copyright notice and this permission notice appear in 7# supporting documentation, and that the name of Vinay Sajip 8# not be used in advertising or publicity pertaining to distribution 9# of the software without specific, written prior permission. 10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING 11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR 13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER 14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 17"""Test harness for the logging module. Run all tests. 18 19Copyright (C) 2001-2021 Vinay Sajip. All Rights Reserved. 20""" 21 22import logging 23import logging.handlers 24import logging.config 25 26import codecs 27import configparser 28import copy 29import datetime 30import pathlib 31import pickle 32import io 33import gc 34import json 35import os 36import queue 37import random 38import re 39import shutil 40import socket 41import struct 42import sys 43import tempfile 44from test.support.script_helper import assert_python_ok, assert_python_failure 45from test import support 46from test.support import os_helper 47from test.support import socket_helper 48from test.support import threading_helper 49from test.support import warnings_helper 50from test.support.logging_helper import TestHandler 51import textwrap 52import threading 53import time 54import unittest 55import warnings 56import weakref 57 58from http.server import HTTPServer, BaseHTTPRequestHandler 59from urllib.parse import urlparse, parse_qs 60from socketserver import (ThreadingUDPServer, DatagramRequestHandler, 61 ThreadingTCPServer, StreamRequestHandler) 62 63with warnings.catch_warnings(): 64 warnings.simplefilter('ignore', DeprecationWarning) 65 import asyncore 66 import smtpd 67 68try: 69 import win32evtlog, win32evtlogutil, pywintypes 70except ImportError: 71 win32evtlog = win32evtlogutil = pywintypes = None 72 73try: 74 import zlib 75except ImportError: 76 pass 77 78class BaseTest(unittest.TestCase): 79 80 """Base class for logging tests.""" 81 82 log_format = "%(name)s -> %(levelname)s: %(message)s" 83 expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" 84 message_num = 0 85 86 def setUp(self): 87 """Setup the default logging stream to an internal StringIO instance, 88 so that we can examine log output as we want.""" 89 self._threading_key = threading_helper.threading_setup() 90 91 logger_dict = logging.getLogger().manager.loggerDict 92 logging._acquireLock() 93 try: 94 self.saved_handlers = logging._handlers.copy() 95 self.saved_handler_list = logging._handlerList[:] 96 self.saved_loggers = saved_loggers = logger_dict.copy() 97 self.saved_name_to_level = logging._nameToLevel.copy() 98 self.saved_level_to_name = logging._levelToName.copy() 99 self.logger_states = logger_states = {} 100 for name in saved_loggers: 101 logger_states[name] = getattr(saved_loggers[name], 102 'disabled', None) 103 finally: 104 logging._releaseLock() 105 106 # Set two unused loggers 107 self.logger1 = logging.getLogger("\xab\xd7\xbb") 108 self.logger2 = logging.getLogger("\u013f\u00d6\u0047") 109 110 self.root_logger = logging.getLogger("") 111 self.original_logging_level = self.root_logger.getEffectiveLevel() 112 113 self.stream = io.StringIO() 114 self.root_logger.setLevel(logging.DEBUG) 115 self.root_hdlr = logging.StreamHandler(self.stream) 116 self.root_formatter = logging.Formatter(self.log_format) 117 self.root_hdlr.setFormatter(self.root_formatter) 118 if self.logger1.hasHandlers(): 119 hlist = self.logger1.handlers + self.root_logger.handlers 120 raise AssertionError('Unexpected handlers: %s' % hlist) 121 if self.logger2.hasHandlers(): 122 hlist = self.logger2.handlers + self.root_logger.handlers 123 raise AssertionError('Unexpected handlers: %s' % hlist) 124 self.root_logger.addHandler(self.root_hdlr) 125 self.assertTrue(self.logger1.hasHandlers()) 126 self.assertTrue(self.logger2.hasHandlers()) 127 128 def tearDown(self): 129 """Remove our logging stream, and restore the original logging 130 level.""" 131 self.stream.close() 132 self.root_logger.removeHandler(self.root_hdlr) 133 while self.root_logger.handlers: 134 h = self.root_logger.handlers[0] 135 self.root_logger.removeHandler(h) 136 h.close() 137 self.root_logger.setLevel(self.original_logging_level) 138 logging._acquireLock() 139 try: 140 logging._levelToName.clear() 141 logging._levelToName.update(self.saved_level_to_name) 142 logging._nameToLevel.clear() 143 logging._nameToLevel.update(self.saved_name_to_level) 144 logging._handlers.clear() 145 logging._handlers.update(self.saved_handlers) 146 logging._handlerList[:] = self.saved_handler_list 147 manager = logging.getLogger().manager 148 manager.disable = 0 149 loggerDict = manager.loggerDict 150 loggerDict.clear() 151 loggerDict.update(self.saved_loggers) 152 logger_states = self.logger_states 153 for name in self.logger_states: 154 if logger_states[name] is not None: 155 self.saved_loggers[name].disabled = logger_states[name] 156 finally: 157 logging._releaseLock() 158 159 self.doCleanups() 160 threading_helper.threading_cleanup(*self._threading_key) 161 162 def assert_log_lines(self, expected_values, stream=None, pat=None): 163 """Match the collected log lines against the regular expression 164 self.expected_log_pat, and compare the extracted group values to 165 the expected_values list of tuples.""" 166 stream = stream or self.stream 167 pat = re.compile(pat or self.expected_log_pat) 168 actual_lines = stream.getvalue().splitlines() 169 self.assertEqual(len(actual_lines), len(expected_values)) 170 for actual, expected in zip(actual_lines, expected_values): 171 match = pat.search(actual) 172 if not match: 173 self.fail("Log line does not match expected pattern:\n" + 174 actual) 175 self.assertEqual(tuple(match.groups()), expected) 176 s = stream.read() 177 if s: 178 self.fail("Remaining output at end of log stream:\n" + s) 179 180 def next_message(self): 181 """Generate a message consisting solely of an auto-incrementing 182 integer.""" 183 self.message_num += 1 184 return "%d" % self.message_num 185 186 187class BuiltinLevelsTest(BaseTest): 188 """Test builtin levels and their inheritance.""" 189 190 def test_flat(self): 191 # Logging levels in a flat logger namespace. 192 m = self.next_message 193 194 ERR = logging.getLogger("ERR") 195 ERR.setLevel(logging.ERROR) 196 INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) 197 INF.setLevel(logging.INFO) 198 DEB = logging.getLogger("DEB") 199 DEB.setLevel(logging.DEBUG) 200 201 # These should log. 202 ERR.log(logging.CRITICAL, m()) 203 ERR.error(m()) 204 205 INF.log(logging.CRITICAL, m()) 206 INF.error(m()) 207 INF.warning(m()) 208 INF.info(m()) 209 210 DEB.log(logging.CRITICAL, m()) 211 DEB.error(m()) 212 DEB.warning(m()) 213 DEB.info(m()) 214 DEB.debug(m()) 215 216 # These should not log. 217 ERR.warning(m()) 218 ERR.info(m()) 219 ERR.debug(m()) 220 221 INF.debug(m()) 222 223 self.assert_log_lines([ 224 ('ERR', 'CRITICAL', '1'), 225 ('ERR', 'ERROR', '2'), 226 ('INF', 'CRITICAL', '3'), 227 ('INF', 'ERROR', '4'), 228 ('INF', 'WARNING', '5'), 229 ('INF', 'INFO', '6'), 230 ('DEB', 'CRITICAL', '7'), 231 ('DEB', 'ERROR', '8'), 232 ('DEB', 'WARNING', '9'), 233 ('DEB', 'INFO', '10'), 234 ('DEB', 'DEBUG', '11'), 235 ]) 236 237 def test_nested_explicit(self): 238 # Logging levels in a nested namespace, all explicitly set. 239 m = self.next_message 240 241 INF = logging.getLogger("INF") 242 INF.setLevel(logging.INFO) 243 INF_ERR = logging.getLogger("INF.ERR") 244 INF_ERR.setLevel(logging.ERROR) 245 246 # These should log. 247 INF_ERR.log(logging.CRITICAL, m()) 248 INF_ERR.error(m()) 249 250 # These should not log. 251 INF_ERR.warning(m()) 252 INF_ERR.info(m()) 253 INF_ERR.debug(m()) 254 255 self.assert_log_lines([ 256 ('INF.ERR', 'CRITICAL', '1'), 257 ('INF.ERR', 'ERROR', '2'), 258 ]) 259 260 def test_nested_inherited(self): 261 # Logging levels in a nested namespace, inherited from parent loggers. 262 m = self.next_message 263 264 INF = logging.getLogger("INF") 265 INF.setLevel(logging.INFO) 266 INF_ERR = logging.getLogger("INF.ERR") 267 INF_ERR.setLevel(logging.ERROR) 268 INF_UNDEF = logging.getLogger("INF.UNDEF") 269 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") 270 UNDEF = logging.getLogger("UNDEF") 271 272 # These should log. 273 INF_UNDEF.log(logging.CRITICAL, m()) 274 INF_UNDEF.error(m()) 275 INF_UNDEF.warning(m()) 276 INF_UNDEF.info(m()) 277 INF_ERR_UNDEF.log(logging.CRITICAL, m()) 278 INF_ERR_UNDEF.error(m()) 279 280 # These should not log. 281 INF_UNDEF.debug(m()) 282 INF_ERR_UNDEF.warning(m()) 283 INF_ERR_UNDEF.info(m()) 284 INF_ERR_UNDEF.debug(m()) 285 286 self.assert_log_lines([ 287 ('INF.UNDEF', 'CRITICAL', '1'), 288 ('INF.UNDEF', 'ERROR', '2'), 289 ('INF.UNDEF', 'WARNING', '3'), 290 ('INF.UNDEF', 'INFO', '4'), 291 ('INF.ERR.UNDEF', 'CRITICAL', '5'), 292 ('INF.ERR.UNDEF', 'ERROR', '6'), 293 ]) 294 295 def test_nested_with_virtual_parent(self): 296 # Logging levels when some parent does not exist yet. 297 m = self.next_message 298 299 INF = logging.getLogger("INF") 300 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") 301 CHILD = logging.getLogger("INF.BADPARENT") 302 INF.setLevel(logging.INFO) 303 304 # These should log. 305 GRANDCHILD.log(logging.FATAL, m()) 306 GRANDCHILD.info(m()) 307 CHILD.log(logging.FATAL, m()) 308 CHILD.info(m()) 309 310 # These should not log. 311 GRANDCHILD.debug(m()) 312 CHILD.debug(m()) 313 314 self.assert_log_lines([ 315 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), 316 ('INF.BADPARENT.UNDEF', 'INFO', '2'), 317 ('INF.BADPARENT', 'CRITICAL', '3'), 318 ('INF.BADPARENT', 'INFO', '4'), 319 ]) 320 321 def test_regression_22386(self): 322 """See issue #22386 for more information.""" 323 self.assertEqual(logging.getLevelName('INFO'), logging.INFO) 324 self.assertEqual(logging.getLevelName(logging.INFO), 'INFO') 325 326 def test_issue27935(self): 327 fatal = logging.getLevelName('FATAL') 328 self.assertEqual(fatal, logging.FATAL) 329 330 def test_regression_29220(self): 331 """See issue #29220 for more information.""" 332 logging.addLevelName(logging.INFO, '') 333 self.addCleanup(logging.addLevelName, logging.INFO, 'INFO') 334 self.assertEqual(logging.getLevelName(logging.INFO), '') 335 self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET') 336 self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET) 337 338class BasicFilterTest(BaseTest): 339 340 """Test the bundled Filter class.""" 341 342 def test_filter(self): 343 # Only messages satisfying the specified criteria pass through the 344 # filter. 345 filter_ = logging.Filter("spam.eggs") 346 handler = self.root_logger.handlers[0] 347 try: 348 handler.addFilter(filter_) 349 spam = logging.getLogger("spam") 350 spam_eggs = logging.getLogger("spam.eggs") 351 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 352 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 353 354 spam.info(self.next_message()) 355 spam_eggs.info(self.next_message()) # Good. 356 spam_eggs_fish.info(self.next_message()) # Good. 357 spam_bakedbeans.info(self.next_message()) 358 359 self.assert_log_lines([ 360 ('spam.eggs', 'INFO', '2'), 361 ('spam.eggs.fish', 'INFO', '3'), 362 ]) 363 finally: 364 handler.removeFilter(filter_) 365 366 def test_callable_filter(self): 367 # Only messages satisfying the specified criteria pass through the 368 # filter. 369 370 def filterfunc(record): 371 parts = record.name.split('.') 372 prefix = '.'.join(parts[:2]) 373 return prefix == 'spam.eggs' 374 375 handler = self.root_logger.handlers[0] 376 try: 377 handler.addFilter(filterfunc) 378 spam = logging.getLogger("spam") 379 spam_eggs = logging.getLogger("spam.eggs") 380 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 381 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 382 383 spam.info(self.next_message()) 384 spam_eggs.info(self.next_message()) # Good. 385 spam_eggs_fish.info(self.next_message()) # Good. 386 spam_bakedbeans.info(self.next_message()) 387 388 self.assert_log_lines([ 389 ('spam.eggs', 'INFO', '2'), 390 ('spam.eggs.fish', 'INFO', '3'), 391 ]) 392 finally: 393 handler.removeFilter(filterfunc) 394 395 def test_empty_filter(self): 396 f = logging.Filter() 397 r = logging.makeLogRecord({'name': 'spam.eggs'}) 398 self.assertTrue(f.filter(r)) 399 400# 401# First, we define our levels. There can be as many as you want - the only 402# limitations are that they should be integers, the lowest should be > 0 and 403# larger values mean less information being logged. If you need specific 404# level values which do not fit into these limitations, you can use a 405# mapping dictionary to convert between your application levels and the 406# logging system. 407# 408SILENT = 120 409TACITURN = 119 410TERSE = 118 411EFFUSIVE = 117 412SOCIABLE = 116 413VERBOSE = 115 414TALKATIVE = 114 415GARRULOUS = 113 416CHATTERBOX = 112 417BORING = 111 418 419LEVEL_RANGE = range(BORING, SILENT + 1) 420 421# 422# Next, we define names for our levels. You don't need to do this - in which 423# case the system will use "Level n" to denote the text for the level. 424# 425my_logging_levels = { 426 SILENT : 'Silent', 427 TACITURN : 'Taciturn', 428 TERSE : 'Terse', 429 EFFUSIVE : 'Effusive', 430 SOCIABLE : 'Sociable', 431 VERBOSE : 'Verbose', 432 TALKATIVE : 'Talkative', 433 GARRULOUS : 'Garrulous', 434 CHATTERBOX : 'Chatterbox', 435 BORING : 'Boring', 436} 437 438class GarrulousFilter(logging.Filter): 439 440 """A filter which blocks garrulous messages.""" 441 442 def filter(self, record): 443 return record.levelno != GARRULOUS 444 445class VerySpecificFilter(logging.Filter): 446 447 """A filter which blocks sociable and taciturn messages.""" 448 449 def filter(self, record): 450 return record.levelno not in [SOCIABLE, TACITURN] 451 452 453class CustomLevelsAndFiltersTest(BaseTest): 454 455 """Test various filtering possibilities with custom logging levels.""" 456 457 # Skip the logger name group. 458 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 459 460 def setUp(self): 461 BaseTest.setUp(self) 462 for k, v in my_logging_levels.items(): 463 logging.addLevelName(k, v) 464 465 def log_at_all_levels(self, logger): 466 for lvl in LEVEL_RANGE: 467 logger.log(lvl, self.next_message()) 468 469 def test_logger_filter(self): 470 # Filter at logger level. 471 self.root_logger.setLevel(VERBOSE) 472 # Levels >= 'Verbose' are good. 473 self.log_at_all_levels(self.root_logger) 474 self.assert_log_lines([ 475 ('Verbose', '5'), 476 ('Sociable', '6'), 477 ('Effusive', '7'), 478 ('Terse', '8'), 479 ('Taciturn', '9'), 480 ('Silent', '10'), 481 ]) 482 483 def test_handler_filter(self): 484 # Filter at handler level. 485 self.root_logger.handlers[0].setLevel(SOCIABLE) 486 try: 487 # Levels >= 'Sociable' are good. 488 self.log_at_all_levels(self.root_logger) 489 self.assert_log_lines([ 490 ('Sociable', '6'), 491 ('Effusive', '7'), 492 ('Terse', '8'), 493 ('Taciturn', '9'), 494 ('Silent', '10'), 495 ]) 496 finally: 497 self.root_logger.handlers[0].setLevel(logging.NOTSET) 498 499 def test_specific_filters(self): 500 # Set a specific filter object on the handler, and then add another 501 # filter object on the logger itself. 502 handler = self.root_logger.handlers[0] 503 specific_filter = None 504 garr = GarrulousFilter() 505 handler.addFilter(garr) 506 try: 507 self.log_at_all_levels(self.root_logger) 508 first_lines = [ 509 # Notice how 'Garrulous' is missing 510 ('Boring', '1'), 511 ('Chatterbox', '2'), 512 ('Talkative', '4'), 513 ('Verbose', '5'), 514 ('Sociable', '6'), 515 ('Effusive', '7'), 516 ('Terse', '8'), 517 ('Taciturn', '9'), 518 ('Silent', '10'), 519 ] 520 self.assert_log_lines(first_lines) 521 522 specific_filter = VerySpecificFilter() 523 self.root_logger.addFilter(specific_filter) 524 self.log_at_all_levels(self.root_logger) 525 self.assert_log_lines(first_lines + [ 526 # Not only 'Garrulous' is still missing, but also 'Sociable' 527 # and 'Taciturn' 528 ('Boring', '11'), 529 ('Chatterbox', '12'), 530 ('Talkative', '14'), 531 ('Verbose', '15'), 532 ('Effusive', '17'), 533 ('Terse', '18'), 534 ('Silent', '20'), 535 ]) 536 finally: 537 if specific_filter: 538 self.root_logger.removeFilter(specific_filter) 539 handler.removeFilter(garr) 540 541 542class HandlerTest(BaseTest): 543 def test_name(self): 544 h = logging.Handler() 545 h.name = 'generic' 546 self.assertEqual(h.name, 'generic') 547 h.name = 'anothergeneric' 548 self.assertEqual(h.name, 'anothergeneric') 549 self.assertRaises(NotImplementedError, h.emit, None) 550 551 def test_builtin_handlers(self): 552 # We can't actually *use* too many handlers in the tests, 553 # but we can try instantiating them with various options 554 if sys.platform in ('linux', 'darwin'): 555 for existing in (True, False): 556 fd, fn = tempfile.mkstemp() 557 os.close(fd) 558 if not existing: 559 os.unlink(fn) 560 h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=True) 561 if existing: 562 dev, ino = h.dev, h.ino 563 self.assertEqual(dev, -1) 564 self.assertEqual(ino, -1) 565 r = logging.makeLogRecord({'msg': 'Test'}) 566 h.handle(r) 567 # Now remove the file. 568 os.unlink(fn) 569 self.assertFalse(os.path.exists(fn)) 570 # The next call should recreate the file. 571 h.handle(r) 572 self.assertTrue(os.path.exists(fn)) 573 else: 574 self.assertEqual(h.dev, -1) 575 self.assertEqual(h.ino, -1) 576 h.close() 577 if existing: 578 os.unlink(fn) 579 if sys.platform == 'darwin': 580 sockname = '/var/run/syslog' 581 else: 582 sockname = '/dev/log' 583 try: 584 h = logging.handlers.SysLogHandler(sockname) 585 self.assertEqual(h.facility, h.LOG_USER) 586 self.assertTrue(h.unixsocket) 587 h.close() 588 except OSError: # syslogd might not be available 589 pass 590 for method in ('GET', 'POST', 'PUT'): 591 if method == 'PUT': 592 self.assertRaises(ValueError, logging.handlers.HTTPHandler, 593 'localhost', '/log', method) 594 else: 595 h = logging.handlers.HTTPHandler('localhost', '/log', method) 596 h.close() 597 h = logging.handlers.BufferingHandler(0) 598 r = logging.makeLogRecord({}) 599 self.assertTrue(h.shouldFlush(r)) 600 h.close() 601 h = logging.handlers.BufferingHandler(1) 602 self.assertFalse(h.shouldFlush(r)) 603 h.close() 604 605 def test_path_objects(self): 606 """ 607 Test that Path objects are accepted as filename arguments to handlers. 608 609 See Issue #27493. 610 """ 611 fd, fn = tempfile.mkstemp() 612 os.close(fd) 613 os.unlink(fn) 614 pfn = pathlib.Path(fn) 615 cases = ( 616 (logging.FileHandler, (pfn, 'w')), 617 (logging.handlers.RotatingFileHandler, (pfn, 'a')), 618 (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')), 619 ) 620 if sys.platform in ('linux', 'darwin'): 621 cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),) 622 for cls, args in cases: 623 h = cls(*args, encoding="utf-8") 624 self.assertTrue(os.path.exists(fn)) 625 h.close() 626 os.unlink(fn) 627 628 @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') 629 def test_race(self): 630 # Issue #14632 refers. 631 def remove_loop(fname, tries): 632 for _ in range(tries): 633 try: 634 os.unlink(fname) 635 self.deletion_time = time.time() 636 except OSError: 637 pass 638 time.sleep(0.004 * random.randint(0, 4)) 639 640 del_count = 500 641 log_count = 500 642 643 self.handle_time = None 644 self.deletion_time = None 645 646 for delay in (False, True): 647 fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') 648 os.close(fd) 649 remover = threading.Thread(target=remove_loop, args=(fn, del_count)) 650 remover.daemon = True 651 remover.start() 652 h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=delay) 653 f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') 654 h.setFormatter(f) 655 try: 656 for _ in range(log_count): 657 time.sleep(0.005) 658 r = logging.makeLogRecord({'msg': 'testing' }) 659 try: 660 self.handle_time = time.time() 661 h.handle(r) 662 except Exception: 663 print('Deleted at %s, ' 664 'opened at %s' % (self.deletion_time, 665 self.handle_time)) 666 raise 667 finally: 668 remover.join() 669 h.close() 670 if os.path.exists(fn): 671 os.unlink(fn) 672 673 # The implementation relies on os.register_at_fork existing, but we test 674 # based on os.fork existing because that is what users and this test use. 675 # This helps ensure that when fork exists (the important concept) that the 676 # register_at_fork mechanism is also present and used. 677 @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().') 678 def test_post_fork_child_no_deadlock(self): 679 """Ensure child logging locks are not held; bpo-6721 & bpo-36533.""" 680 class _OurHandler(logging.Handler): 681 def __init__(self): 682 super().__init__() 683 self.sub_handler = logging.StreamHandler( 684 stream=open('/dev/null', 'wt', encoding='utf-8')) 685 686 def emit(self, record): 687 self.sub_handler.acquire() 688 try: 689 self.sub_handler.emit(record) 690 finally: 691 self.sub_handler.release() 692 693 self.assertEqual(len(logging._handlers), 0) 694 refed_h = _OurHandler() 695 self.addCleanup(refed_h.sub_handler.stream.close) 696 refed_h.name = 'because we need at least one for this test' 697 self.assertGreater(len(logging._handlers), 0) 698 self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1) 699 test_logger = logging.getLogger('test_post_fork_child_no_deadlock') 700 test_logger.addHandler(refed_h) 701 test_logger.setLevel(logging.DEBUG) 702 703 locks_held__ready_to_fork = threading.Event() 704 fork_happened__release_locks_and_end_thread = threading.Event() 705 706 def lock_holder_thread_fn(): 707 logging._acquireLock() 708 try: 709 refed_h.acquire() 710 try: 711 # Tell the main thread to do the fork. 712 locks_held__ready_to_fork.set() 713 714 # If the deadlock bug exists, the fork will happen 715 # without dealing with the locks we hold, deadlocking 716 # the child. 717 718 # Wait for a successful fork or an unreasonable amount of 719 # time before releasing our locks. To avoid a timing based 720 # test we'd need communication from os.fork() as to when it 721 # has actually happened. Given this is a regression test 722 # for a fixed issue, potentially less reliably detecting 723 # regression via timing is acceptable for simplicity. 724 # The test will always take at least this long. :( 725 fork_happened__release_locks_and_end_thread.wait(0.5) 726 finally: 727 refed_h.release() 728 finally: 729 logging._releaseLock() 730 731 lock_holder_thread = threading.Thread( 732 target=lock_holder_thread_fn, 733 name='test_post_fork_child_no_deadlock lock holder') 734 lock_holder_thread.start() 735 736 locks_held__ready_to_fork.wait() 737 pid = os.fork() 738 if pid == 0: 739 # Child process 740 try: 741 test_logger.info(r'Child process did not deadlock. \o/') 742 finally: 743 os._exit(0) 744 else: 745 # Parent process 746 test_logger.info(r'Parent process returned from fork. \o/') 747 fork_happened__release_locks_and_end_thread.set() 748 lock_holder_thread.join() 749 750 support.wait_process(pid, exitcode=0) 751 752 753class BadStream(object): 754 def write(self, data): 755 raise RuntimeError('deliberate mistake') 756 757class TestStreamHandler(logging.StreamHandler): 758 def handleError(self, record): 759 self.error_record = record 760 761class StreamWithIntName(object): 762 level = logging.NOTSET 763 name = 2 764 765class StreamHandlerTest(BaseTest): 766 def test_error_handling(self): 767 h = TestStreamHandler(BadStream()) 768 r = logging.makeLogRecord({}) 769 old_raise = logging.raiseExceptions 770 771 try: 772 h.handle(r) 773 self.assertIs(h.error_record, r) 774 775 h = logging.StreamHandler(BadStream()) 776 with support.captured_stderr() as stderr: 777 h.handle(r) 778 msg = '\nRuntimeError: deliberate mistake\n' 779 self.assertIn(msg, stderr.getvalue()) 780 781 logging.raiseExceptions = False 782 with support.captured_stderr() as stderr: 783 h.handle(r) 784 self.assertEqual('', stderr.getvalue()) 785 finally: 786 logging.raiseExceptions = old_raise 787 788 def test_stream_setting(self): 789 """ 790 Test setting the handler's stream 791 """ 792 h = logging.StreamHandler() 793 stream = io.StringIO() 794 old = h.setStream(stream) 795 self.assertIs(old, sys.stderr) 796 actual = h.setStream(old) 797 self.assertIs(actual, stream) 798 # test that setting to existing value returns None 799 actual = h.setStream(old) 800 self.assertIsNone(actual) 801 802 def test_can_represent_stream_with_int_name(self): 803 h = logging.StreamHandler(StreamWithIntName()) 804 self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>') 805 806# -- The following section could be moved into a server_helper.py module 807# -- if it proves to be of wider utility than just test_logging 808 809class TestSMTPServer(smtpd.SMTPServer): 810 """ 811 This class implements a test SMTP server. 812 813 :param addr: A (host, port) tuple which the server listens on. 814 You can specify a port value of zero: the server's 815 *port* attribute will hold the actual port number 816 used, which can be used in client connections. 817 :param handler: A callable which will be called to process 818 incoming messages. The handler will be passed 819 the client address tuple, who the message is from, 820 a list of recipients and the message data. 821 :param poll_interval: The interval, in seconds, used in the underlying 822 :func:`select` or :func:`poll` call by 823 :func:`asyncore.loop`. 824 :param sockmap: A dictionary which will be used to hold 825 :class:`asyncore.dispatcher` instances used by 826 :func:`asyncore.loop`. This avoids changing the 827 :mod:`asyncore` module's global state. 828 """ 829 830 def __init__(self, addr, handler, poll_interval, sockmap): 831 smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, 832 decode_data=True) 833 self.port = self.socket.getsockname()[1] 834 self._handler = handler 835 self._thread = None 836 self._quit = False 837 self.poll_interval = poll_interval 838 839 def process_message(self, peer, mailfrom, rcpttos, data): 840 """ 841 Delegates to the handler passed in to the server's constructor. 842 843 Typically, this will be a test case method. 844 :param peer: The client (host, port) tuple. 845 :param mailfrom: The address of the sender. 846 :param rcpttos: The addresses of the recipients. 847 :param data: The message. 848 """ 849 self._handler(peer, mailfrom, rcpttos, data) 850 851 def start(self): 852 """ 853 Start the server running on a separate daemon thread. 854 """ 855 self._thread = t = threading.Thread(target=self.serve_forever, 856 args=(self.poll_interval,)) 857 t.daemon = True 858 t.start() 859 860 def serve_forever(self, poll_interval): 861 """ 862 Run the :mod:`asyncore` loop until normal termination 863 conditions arise. 864 :param poll_interval: The interval, in seconds, used in the underlying 865 :func:`select` or :func:`poll` call by 866 :func:`asyncore.loop`. 867 """ 868 while not self._quit: 869 asyncore.loop(poll_interval, map=self._map, count=1) 870 871 def stop(self): 872 """ 873 Stop the thread by closing the server instance. 874 Wait for the server thread to terminate. 875 """ 876 self._quit = True 877 threading_helper.join_thread(self._thread) 878 self._thread = None 879 self.close() 880 asyncore.close_all(map=self._map, ignore_all=True) 881 882 883class ControlMixin(object): 884 """ 885 This mixin is used to start a server on a separate thread, and 886 shut it down programmatically. Request handling is simplified - instead 887 of needing to derive a suitable RequestHandler subclass, you just 888 provide a callable which will be passed each received request to be 889 processed. 890 891 :param handler: A handler callable which will be called with a 892 single parameter - the request - in order to 893 process the request. This handler is called on the 894 server thread, effectively meaning that requests are 895 processed serially. While not quite web scale ;-), 896 this should be fine for testing applications. 897 :param poll_interval: The polling interval in seconds. 898 """ 899 def __init__(self, handler, poll_interval): 900 self._thread = None 901 self.poll_interval = poll_interval 902 self._handler = handler 903 self.ready = threading.Event() 904 905 def start(self): 906 """ 907 Create a daemon thread to run the server, and start it. 908 """ 909 self._thread = t = threading.Thread(target=self.serve_forever, 910 args=(self.poll_interval,)) 911 t.daemon = True 912 t.start() 913 914 def serve_forever(self, poll_interval): 915 """ 916 Run the server. Set the ready flag before entering the 917 service loop. 918 """ 919 self.ready.set() 920 super(ControlMixin, self).serve_forever(poll_interval) 921 922 def stop(self): 923 """ 924 Tell the server thread to stop, and wait for it to do so. 925 """ 926 self.shutdown() 927 if self._thread is not None: 928 threading_helper.join_thread(self._thread) 929 self._thread = None 930 self.server_close() 931 self.ready.clear() 932 933class TestHTTPServer(ControlMixin, HTTPServer): 934 """ 935 An HTTP server which is controllable using :class:`ControlMixin`. 936 937 :param addr: A tuple with the IP address and port to listen on. 938 :param handler: A handler callable which will be called with a 939 single parameter - the request - in order to 940 process the request. 941 :param poll_interval: The polling interval in seconds. 942 :param log: Pass ``True`` to enable log messages. 943 """ 944 def __init__(self, addr, handler, poll_interval=0.5, 945 log=False, sslctx=None): 946 class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): 947 def __getattr__(self, name, default=None): 948 if name.startswith('do_'): 949 return self.process_request 950 raise AttributeError(name) 951 952 def process_request(self): 953 self.server._handler(self) 954 955 def log_message(self, format, *args): 956 if log: 957 super(DelegatingHTTPRequestHandler, 958 self).log_message(format, *args) 959 HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) 960 ControlMixin.__init__(self, handler, poll_interval) 961 self.sslctx = sslctx 962 963 def get_request(self): 964 try: 965 sock, addr = self.socket.accept() 966 if self.sslctx: 967 sock = self.sslctx.wrap_socket(sock, server_side=True) 968 except OSError as e: 969 # socket errors are silenced by the caller, print them here 970 sys.stderr.write("Got an error:\n%s\n" % e) 971 raise 972 return sock, addr 973 974class TestTCPServer(ControlMixin, ThreadingTCPServer): 975 """ 976 A TCP server which is controllable using :class:`ControlMixin`. 977 978 :param addr: A tuple with the IP address and port to listen on. 979 :param handler: A handler callable which will be called with a single 980 parameter - the request - in order to process the request. 981 :param poll_interval: The polling interval in seconds. 982 :bind_and_activate: If True (the default), binds the server and starts it 983 listening. If False, you need to call 984 :meth:`server_bind` and :meth:`server_activate` at 985 some later time before calling :meth:`start`, so that 986 the server will set up the socket and listen on it. 987 """ 988 989 allow_reuse_address = True 990 991 def __init__(self, addr, handler, poll_interval=0.5, 992 bind_and_activate=True): 993 class DelegatingTCPRequestHandler(StreamRequestHandler): 994 995 def handle(self): 996 self.server._handler(self) 997 ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, 998 bind_and_activate) 999 ControlMixin.__init__(self, handler, poll_interval) 1000 1001 def server_bind(self): 1002 super(TestTCPServer, self).server_bind() 1003 self.port = self.socket.getsockname()[1] 1004 1005class TestUDPServer(ControlMixin, ThreadingUDPServer): 1006 """ 1007 A UDP server which is controllable using :class:`ControlMixin`. 1008 1009 :param addr: A tuple with the IP address and port to listen on. 1010 :param handler: A handler callable which will be called with a 1011 single parameter - the request - in order to 1012 process the request. 1013 :param poll_interval: The polling interval for shutdown requests, 1014 in seconds. 1015 :bind_and_activate: If True (the default), binds the server and 1016 starts it listening. If False, you need to 1017 call :meth:`server_bind` and 1018 :meth:`server_activate` at some later time 1019 before calling :meth:`start`, so that the server will 1020 set up the socket and listen on it. 1021 """ 1022 def __init__(self, addr, handler, poll_interval=0.5, 1023 bind_and_activate=True): 1024 class DelegatingUDPRequestHandler(DatagramRequestHandler): 1025 1026 def handle(self): 1027 self.server._handler(self) 1028 1029 def finish(self): 1030 data = self.wfile.getvalue() 1031 if data: 1032 try: 1033 super(DelegatingUDPRequestHandler, self).finish() 1034 except OSError: 1035 if not self.server._closed: 1036 raise 1037 1038 ThreadingUDPServer.__init__(self, addr, 1039 DelegatingUDPRequestHandler, 1040 bind_and_activate) 1041 ControlMixin.__init__(self, handler, poll_interval) 1042 self._closed = False 1043 1044 def server_bind(self): 1045 super(TestUDPServer, self).server_bind() 1046 self.port = self.socket.getsockname()[1] 1047 1048 def server_close(self): 1049 super(TestUDPServer, self).server_close() 1050 self._closed = True 1051 1052if hasattr(socket, "AF_UNIX"): 1053 class TestUnixStreamServer(TestTCPServer): 1054 address_family = socket.AF_UNIX 1055 1056 class TestUnixDatagramServer(TestUDPServer): 1057 address_family = socket.AF_UNIX 1058 1059# - end of server_helper section 1060 1061class SMTPHandlerTest(BaseTest): 1062 # bpo-14314, bpo-19665, bpo-34092: don't wait forever 1063 TIMEOUT = support.LONG_TIMEOUT 1064 1065 def test_basic(self): 1066 sockmap = {} 1067 server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001, 1068 sockmap) 1069 server.start() 1070 addr = (socket_helper.HOST, server.port) 1071 h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', 1072 timeout=self.TIMEOUT) 1073 self.assertEqual(h.toaddrs, ['you']) 1074 self.messages = [] 1075 r = logging.makeLogRecord({'msg': 'Hello \u2713'}) 1076 self.handled = threading.Event() 1077 h.handle(r) 1078 self.handled.wait(self.TIMEOUT) 1079 server.stop() 1080 self.assertTrue(self.handled.is_set()) 1081 self.assertEqual(len(self.messages), 1) 1082 peer, mailfrom, rcpttos, data = self.messages[0] 1083 self.assertEqual(mailfrom, 'me') 1084 self.assertEqual(rcpttos, ['you']) 1085 self.assertIn('\nSubject: Log\n', data) 1086 self.assertTrue(data.endswith('\n\nHello \u2713')) 1087 h.close() 1088 1089 def process_message(self, *args): 1090 self.messages.append(args) 1091 self.handled.set() 1092 1093class MemoryHandlerTest(BaseTest): 1094 1095 """Tests for the MemoryHandler.""" 1096 1097 # Do not bother with a logger name group. 1098 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 1099 1100 def setUp(self): 1101 BaseTest.setUp(self) 1102 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1103 self.root_hdlr) 1104 self.mem_logger = logging.getLogger('mem') 1105 self.mem_logger.propagate = 0 1106 self.mem_logger.addHandler(self.mem_hdlr) 1107 1108 def tearDown(self): 1109 self.mem_hdlr.close() 1110 BaseTest.tearDown(self) 1111 1112 def test_flush(self): 1113 # The memory handler flushes to its target handler based on specific 1114 # criteria (message count and message level). 1115 self.mem_logger.debug(self.next_message()) 1116 self.assert_log_lines([]) 1117 self.mem_logger.info(self.next_message()) 1118 self.assert_log_lines([]) 1119 # This will flush because the level is >= logging.WARNING 1120 self.mem_logger.warning(self.next_message()) 1121 lines = [ 1122 ('DEBUG', '1'), 1123 ('INFO', '2'), 1124 ('WARNING', '3'), 1125 ] 1126 self.assert_log_lines(lines) 1127 for n in (4, 14): 1128 for i in range(9): 1129 self.mem_logger.debug(self.next_message()) 1130 self.assert_log_lines(lines) 1131 # This will flush because it's the 10th message since the last 1132 # flush. 1133 self.mem_logger.debug(self.next_message()) 1134 lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] 1135 self.assert_log_lines(lines) 1136 1137 self.mem_logger.debug(self.next_message()) 1138 self.assert_log_lines(lines) 1139 1140 def test_flush_on_close(self): 1141 """ 1142 Test that the flush-on-close configuration works as expected. 1143 """ 1144 self.mem_logger.debug(self.next_message()) 1145 self.assert_log_lines([]) 1146 self.mem_logger.info(self.next_message()) 1147 self.assert_log_lines([]) 1148 self.mem_logger.removeHandler(self.mem_hdlr) 1149 # Default behaviour is to flush on close. Check that it happens. 1150 self.mem_hdlr.close() 1151 lines = [ 1152 ('DEBUG', '1'), 1153 ('INFO', '2'), 1154 ] 1155 self.assert_log_lines(lines) 1156 # Now configure for flushing not to be done on close. 1157 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1158 self.root_hdlr, 1159 False) 1160 self.mem_logger.addHandler(self.mem_hdlr) 1161 self.mem_logger.debug(self.next_message()) 1162 self.assert_log_lines(lines) # no change 1163 self.mem_logger.info(self.next_message()) 1164 self.assert_log_lines(lines) # no change 1165 self.mem_logger.removeHandler(self.mem_hdlr) 1166 self.mem_hdlr.close() 1167 # assert that no new lines have been added 1168 self.assert_log_lines(lines) # no change 1169 1170 def test_race_between_set_target_and_flush(self): 1171 class MockRaceConditionHandler: 1172 def __init__(self, mem_hdlr): 1173 self.mem_hdlr = mem_hdlr 1174 self.threads = [] 1175 1176 def removeTarget(self): 1177 self.mem_hdlr.setTarget(None) 1178 1179 def handle(self, msg): 1180 thread = threading.Thread(target=self.removeTarget) 1181 self.threads.append(thread) 1182 thread.start() 1183 1184 target = MockRaceConditionHandler(self.mem_hdlr) 1185 try: 1186 self.mem_hdlr.setTarget(target) 1187 1188 for _ in range(10): 1189 time.sleep(0.005) 1190 self.mem_logger.info("not flushed") 1191 self.mem_logger.warning("flushed") 1192 finally: 1193 for thread in target.threads: 1194 threading_helper.join_thread(thread) 1195 1196 1197class ExceptionFormatter(logging.Formatter): 1198 """A special exception formatter.""" 1199 def formatException(self, ei): 1200 return "Got a [%s]" % ei[0].__name__ 1201 1202 1203class ConfigFileTest(BaseTest): 1204 1205 """Reading logging config from a .ini-style config file.""" 1206 1207 check_no_resource_warning = warnings_helper.check_no_resource_warning 1208 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 1209 1210 # config0 is a standard configuration. 1211 config0 = """ 1212 [loggers] 1213 keys=root 1214 1215 [handlers] 1216 keys=hand1 1217 1218 [formatters] 1219 keys=form1 1220 1221 [logger_root] 1222 level=WARNING 1223 handlers=hand1 1224 1225 [handler_hand1] 1226 class=StreamHandler 1227 level=NOTSET 1228 formatter=form1 1229 args=(sys.stdout,) 1230 1231 [formatter_form1] 1232 format=%(levelname)s ++ %(message)s 1233 datefmt= 1234 """ 1235 1236 # config1 adds a little to the standard configuration. 1237 config1 = """ 1238 [loggers] 1239 keys=root,parser 1240 1241 [handlers] 1242 keys=hand1 1243 1244 [formatters] 1245 keys=form1 1246 1247 [logger_root] 1248 level=WARNING 1249 handlers= 1250 1251 [logger_parser] 1252 level=DEBUG 1253 handlers=hand1 1254 propagate=1 1255 qualname=compiler.parser 1256 1257 [handler_hand1] 1258 class=StreamHandler 1259 level=NOTSET 1260 formatter=form1 1261 args=(sys.stdout,) 1262 1263 [formatter_form1] 1264 format=%(levelname)s ++ %(message)s 1265 datefmt= 1266 """ 1267 1268 # config1a moves the handler to the root. 1269 config1a = """ 1270 [loggers] 1271 keys=root,parser 1272 1273 [handlers] 1274 keys=hand1 1275 1276 [formatters] 1277 keys=form1 1278 1279 [logger_root] 1280 level=WARNING 1281 handlers=hand1 1282 1283 [logger_parser] 1284 level=DEBUG 1285 handlers= 1286 propagate=1 1287 qualname=compiler.parser 1288 1289 [handler_hand1] 1290 class=StreamHandler 1291 level=NOTSET 1292 formatter=form1 1293 args=(sys.stdout,) 1294 1295 [formatter_form1] 1296 format=%(levelname)s ++ %(message)s 1297 datefmt= 1298 """ 1299 1300 # config2 has a subtle configuration error that should be reported 1301 config2 = config1.replace("sys.stdout", "sys.stbout") 1302 1303 # config3 has a less subtle configuration error 1304 config3 = config1.replace("formatter=form1", "formatter=misspelled_name") 1305 1306 # config4 specifies a custom formatter class to be loaded 1307 config4 = """ 1308 [loggers] 1309 keys=root 1310 1311 [handlers] 1312 keys=hand1 1313 1314 [formatters] 1315 keys=form1 1316 1317 [logger_root] 1318 level=NOTSET 1319 handlers=hand1 1320 1321 [handler_hand1] 1322 class=StreamHandler 1323 level=NOTSET 1324 formatter=form1 1325 args=(sys.stdout,) 1326 1327 [formatter_form1] 1328 class=""" + __name__ + """.ExceptionFormatter 1329 format=%(levelname)s:%(name)s:%(message)s 1330 datefmt= 1331 """ 1332 1333 # config5 specifies a custom handler class to be loaded 1334 config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') 1335 1336 # config6 uses ', ' delimiters in the handlers and formatters sections 1337 config6 = """ 1338 [loggers] 1339 keys=root,parser 1340 1341 [handlers] 1342 keys=hand1, hand2 1343 1344 [formatters] 1345 keys=form1, form2 1346 1347 [logger_root] 1348 level=WARNING 1349 handlers= 1350 1351 [logger_parser] 1352 level=DEBUG 1353 handlers=hand1 1354 propagate=1 1355 qualname=compiler.parser 1356 1357 [handler_hand1] 1358 class=StreamHandler 1359 level=NOTSET 1360 formatter=form1 1361 args=(sys.stdout,) 1362 1363 [handler_hand2] 1364 class=StreamHandler 1365 level=NOTSET 1366 formatter=form1 1367 args=(sys.stderr,) 1368 1369 [formatter_form1] 1370 format=%(levelname)s ++ %(message)s 1371 datefmt= 1372 1373 [formatter_form2] 1374 format=%(message)s 1375 datefmt= 1376 """ 1377 1378 # config7 adds a compiler logger, and uses kwargs instead of args. 1379 config7 = """ 1380 [loggers] 1381 keys=root,parser,compiler 1382 1383 [handlers] 1384 keys=hand1 1385 1386 [formatters] 1387 keys=form1 1388 1389 [logger_root] 1390 level=WARNING 1391 handlers=hand1 1392 1393 [logger_compiler] 1394 level=DEBUG 1395 handlers= 1396 propagate=1 1397 qualname=compiler 1398 1399 [logger_parser] 1400 level=DEBUG 1401 handlers= 1402 propagate=1 1403 qualname=compiler.parser 1404 1405 [handler_hand1] 1406 class=StreamHandler 1407 level=NOTSET 1408 formatter=form1 1409 kwargs={'stream': sys.stdout,} 1410 1411 [formatter_form1] 1412 format=%(levelname)s ++ %(message)s 1413 datefmt= 1414 """ 1415 1416 # config 8, check for resource warning 1417 config8 = r""" 1418 [loggers] 1419 keys=root 1420 1421 [handlers] 1422 keys=file 1423 1424 [formatters] 1425 keys= 1426 1427 [logger_root] 1428 level=DEBUG 1429 handlers=file 1430 1431 [handler_file] 1432 class=FileHandler 1433 level=DEBUG 1434 args=("{tempfile}",) 1435 kwargs={{"encoding": "utf-8"}} 1436 """ 1437 1438 disable_test = """ 1439 [loggers] 1440 keys=root 1441 1442 [handlers] 1443 keys=screen 1444 1445 [formatters] 1446 keys= 1447 1448 [logger_root] 1449 level=DEBUG 1450 handlers=screen 1451 1452 [handler_screen] 1453 level=DEBUG 1454 class=StreamHandler 1455 args=(sys.stdout,) 1456 formatter= 1457 """ 1458 1459 def apply_config(self, conf, **kwargs): 1460 file = io.StringIO(textwrap.dedent(conf)) 1461 logging.config.fileConfig(file, encoding="utf-8", **kwargs) 1462 1463 def test_config0_ok(self): 1464 # A simple config file which overrides the default settings. 1465 with support.captured_stdout() as output: 1466 self.apply_config(self.config0) 1467 logger = logging.getLogger() 1468 # Won't output anything 1469 logger.info(self.next_message()) 1470 # Outputs a message 1471 logger.error(self.next_message()) 1472 self.assert_log_lines([ 1473 ('ERROR', '2'), 1474 ], stream=output) 1475 # Original logger output is empty. 1476 self.assert_log_lines([]) 1477 1478 def test_config0_using_cp_ok(self): 1479 # A simple config file which overrides the default settings. 1480 with support.captured_stdout() as output: 1481 file = io.StringIO(textwrap.dedent(self.config0)) 1482 cp = configparser.ConfigParser() 1483 cp.read_file(file) 1484 logging.config.fileConfig(cp) 1485 logger = logging.getLogger() 1486 # Won't output anything 1487 logger.info(self.next_message()) 1488 # Outputs a message 1489 logger.error(self.next_message()) 1490 self.assert_log_lines([ 1491 ('ERROR', '2'), 1492 ], stream=output) 1493 # Original logger output is empty. 1494 self.assert_log_lines([]) 1495 1496 def test_config1_ok(self, config=config1): 1497 # A config file defining a sub-parser as well. 1498 with support.captured_stdout() as output: 1499 self.apply_config(config) 1500 logger = logging.getLogger("compiler.parser") 1501 # Both will output a message 1502 logger.info(self.next_message()) 1503 logger.error(self.next_message()) 1504 self.assert_log_lines([ 1505 ('INFO', '1'), 1506 ('ERROR', '2'), 1507 ], stream=output) 1508 # Original logger output is empty. 1509 self.assert_log_lines([]) 1510 1511 def test_config2_failure(self): 1512 # A simple config file which overrides the default settings. 1513 self.assertRaises(Exception, self.apply_config, self.config2) 1514 1515 def test_config3_failure(self): 1516 # A simple config file which overrides the default settings. 1517 self.assertRaises(Exception, self.apply_config, self.config3) 1518 1519 def test_config4_ok(self): 1520 # A config file specifying a custom formatter class. 1521 with support.captured_stdout() as output: 1522 self.apply_config(self.config4) 1523 logger = logging.getLogger() 1524 try: 1525 raise RuntimeError() 1526 except RuntimeError: 1527 logging.exception("just testing") 1528 sys.stdout.seek(0) 1529 self.assertEqual(output.getvalue(), 1530 "ERROR:root:just testing\nGot a [RuntimeError]\n") 1531 # Original logger output is empty 1532 self.assert_log_lines([]) 1533 1534 def test_config5_ok(self): 1535 self.test_config1_ok(config=self.config5) 1536 1537 def test_config6_ok(self): 1538 self.test_config1_ok(config=self.config6) 1539 1540 def test_config7_ok(self): 1541 with support.captured_stdout() as output: 1542 self.apply_config(self.config1a) 1543 logger = logging.getLogger("compiler.parser") 1544 # See issue #11424. compiler-hyphenated sorts 1545 # between compiler and compiler.xyz and this 1546 # was preventing compiler.xyz from being included 1547 # in the child loggers of compiler because of an 1548 # overzealous loop termination condition. 1549 hyphenated = logging.getLogger('compiler-hyphenated') 1550 # All will output a message 1551 logger.info(self.next_message()) 1552 logger.error(self.next_message()) 1553 hyphenated.critical(self.next_message()) 1554 self.assert_log_lines([ 1555 ('INFO', '1'), 1556 ('ERROR', '2'), 1557 ('CRITICAL', '3'), 1558 ], stream=output) 1559 # Original logger output is empty. 1560 self.assert_log_lines([]) 1561 with support.captured_stdout() as output: 1562 self.apply_config(self.config7) 1563 logger = logging.getLogger("compiler.parser") 1564 self.assertFalse(logger.disabled) 1565 # Both will output a message 1566 logger.info(self.next_message()) 1567 logger.error(self.next_message()) 1568 logger = logging.getLogger("compiler.lexer") 1569 # Both will output a message 1570 logger.info(self.next_message()) 1571 logger.error(self.next_message()) 1572 # Will not appear 1573 hyphenated.critical(self.next_message()) 1574 self.assert_log_lines([ 1575 ('INFO', '4'), 1576 ('ERROR', '5'), 1577 ('INFO', '6'), 1578 ('ERROR', '7'), 1579 ], stream=output) 1580 # Original logger output is empty. 1581 self.assert_log_lines([]) 1582 1583 def test_config8_ok(self): 1584 1585 def cleanup(h1, fn): 1586 h1.close() 1587 os.remove(fn) 1588 1589 with self.check_no_resource_warning(): 1590 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 1591 os.close(fd) 1592 1593 # Replace single backslash with double backslash in windows 1594 # to avoid unicode error during string formatting 1595 if os.name == "nt": 1596 fn = fn.replace("\\", "\\\\") 1597 1598 config8 = self.config8.format(tempfile=fn) 1599 1600 self.apply_config(config8) 1601 self.apply_config(config8) 1602 1603 handler = logging.root.handlers[0] 1604 self.addCleanup(cleanup, handler, fn) 1605 1606 def test_logger_disabling(self): 1607 self.apply_config(self.disable_test) 1608 logger = logging.getLogger('some_pristine_logger') 1609 self.assertFalse(logger.disabled) 1610 self.apply_config(self.disable_test) 1611 self.assertTrue(logger.disabled) 1612 self.apply_config(self.disable_test, disable_existing_loggers=False) 1613 self.assertFalse(logger.disabled) 1614 1615 def test_config_set_handler_names(self): 1616 test_config = """ 1617 [loggers] 1618 keys=root 1619 1620 [handlers] 1621 keys=hand1 1622 1623 [formatters] 1624 keys=form1 1625 1626 [logger_root] 1627 handlers=hand1 1628 1629 [handler_hand1] 1630 class=StreamHandler 1631 formatter=form1 1632 1633 [formatter_form1] 1634 format=%(levelname)s ++ %(message)s 1635 """ 1636 self.apply_config(test_config) 1637 self.assertEqual(logging.getLogger().handlers[0].name, 'hand1') 1638 1639 def test_defaults_do_no_interpolation(self): 1640 """bpo-33802 defaults should not get interpolated""" 1641 ini = textwrap.dedent(""" 1642 [formatters] 1643 keys=default 1644 1645 [formatter_default] 1646 1647 [handlers] 1648 keys=console 1649 1650 [handler_console] 1651 class=logging.StreamHandler 1652 args=tuple() 1653 1654 [loggers] 1655 keys=root 1656 1657 [logger_root] 1658 formatter=default 1659 handlers=console 1660 """).strip() 1661 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini') 1662 try: 1663 os.write(fd, ini.encode('ascii')) 1664 os.close(fd) 1665 logging.config.fileConfig( 1666 fn, 1667 encoding="utf-8", 1668 defaults=dict( 1669 version=1, 1670 disable_existing_loggers=False, 1671 formatters={ 1672 "generic": { 1673 "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s", 1674 "datefmt": "[%Y-%m-%d %H:%M:%S %z]", 1675 "class": "logging.Formatter" 1676 }, 1677 }, 1678 ) 1679 ) 1680 finally: 1681 os.unlink(fn) 1682 1683 1684class SocketHandlerTest(BaseTest): 1685 1686 """Test for SocketHandler objects.""" 1687 1688 server_class = TestTCPServer 1689 address = ('localhost', 0) 1690 1691 def setUp(self): 1692 """Set up a TCP server to receive log messages, and a SocketHandler 1693 pointing to that server's address and port.""" 1694 BaseTest.setUp(self) 1695 # Issue #29177: deal with errors that happen during setup 1696 self.server = self.sock_hdlr = self.server_exception = None 1697 try: 1698 self.server = server = self.server_class(self.address, 1699 self.handle_socket, 0.01) 1700 server.start() 1701 # Uncomment next line to test error recovery in setUp() 1702 # raise OSError('dummy error raised') 1703 except OSError as e: 1704 self.server_exception = e 1705 return 1706 server.ready.wait() 1707 hcls = logging.handlers.SocketHandler 1708 if isinstance(server.server_address, tuple): 1709 self.sock_hdlr = hcls('localhost', server.port) 1710 else: 1711 self.sock_hdlr = hcls(server.server_address, None) 1712 self.log_output = '' 1713 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1714 self.root_logger.addHandler(self.sock_hdlr) 1715 self.handled = threading.Semaphore(0) 1716 1717 def tearDown(self): 1718 """Shutdown the TCP server.""" 1719 try: 1720 if self.sock_hdlr: 1721 self.root_logger.removeHandler(self.sock_hdlr) 1722 self.sock_hdlr.close() 1723 if self.server: 1724 self.server.stop() 1725 finally: 1726 BaseTest.tearDown(self) 1727 1728 def handle_socket(self, request): 1729 conn = request.connection 1730 while True: 1731 chunk = conn.recv(4) 1732 if len(chunk) < 4: 1733 break 1734 slen = struct.unpack(">L", chunk)[0] 1735 chunk = conn.recv(slen) 1736 while len(chunk) < slen: 1737 chunk = chunk + conn.recv(slen - len(chunk)) 1738 obj = pickle.loads(chunk) 1739 record = logging.makeLogRecord(obj) 1740 self.log_output += record.msg + '\n' 1741 self.handled.release() 1742 1743 def test_output(self): 1744 # The log message sent to the SocketHandler is properly received. 1745 if self.server_exception: 1746 self.skipTest(self.server_exception) 1747 logger = logging.getLogger("tcp") 1748 logger.error("spam") 1749 self.handled.acquire() 1750 logger.debug("eggs") 1751 self.handled.acquire() 1752 self.assertEqual(self.log_output, "spam\neggs\n") 1753 1754 def test_noserver(self): 1755 if self.server_exception: 1756 self.skipTest(self.server_exception) 1757 # Avoid timing-related failures due to SocketHandler's own hard-wired 1758 # one-second timeout on socket.create_connection() (issue #16264). 1759 self.sock_hdlr.retryStart = 2.5 1760 # Kill the server 1761 self.server.stop() 1762 # The logging call should try to connect, which should fail 1763 try: 1764 raise RuntimeError('Deliberate mistake') 1765 except RuntimeError: 1766 self.root_logger.exception('Never sent') 1767 self.root_logger.error('Never sent, either') 1768 now = time.time() 1769 self.assertGreater(self.sock_hdlr.retryTime, now) 1770 time.sleep(self.sock_hdlr.retryTime - now + 0.001) 1771 self.root_logger.error('Nor this') 1772 1773def _get_temp_domain_socket(): 1774 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock') 1775 os.close(fd) 1776 # just need a name - file can't be present, or we'll get an 1777 # 'address already in use' error. 1778 os.remove(fn) 1779 return fn 1780 1781@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1782class UnixSocketHandlerTest(SocketHandlerTest): 1783 1784 """Test for SocketHandler with unix sockets.""" 1785 1786 if hasattr(socket, "AF_UNIX"): 1787 server_class = TestUnixStreamServer 1788 1789 def setUp(self): 1790 # override the definition in the base class 1791 self.address = _get_temp_domain_socket() 1792 SocketHandlerTest.setUp(self) 1793 1794 def tearDown(self): 1795 SocketHandlerTest.tearDown(self) 1796 os_helper.unlink(self.address) 1797 1798class DatagramHandlerTest(BaseTest): 1799 1800 """Test for DatagramHandler.""" 1801 1802 server_class = TestUDPServer 1803 address = ('localhost', 0) 1804 1805 def setUp(self): 1806 """Set up a UDP server to receive log messages, and a DatagramHandler 1807 pointing to that server's address and port.""" 1808 BaseTest.setUp(self) 1809 # Issue #29177: deal with errors that happen during setup 1810 self.server = self.sock_hdlr = self.server_exception = None 1811 try: 1812 self.server = server = self.server_class(self.address, 1813 self.handle_datagram, 0.01) 1814 server.start() 1815 # Uncomment next line to test error recovery in setUp() 1816 # raise OSError('dummy error raised') 1817 except OSError as e: 1818 self.server_exception = e 1819 return 1820 server.ready.wait() 1821 hcls = logging.handlers.DatagramHandler 1822 if isinstance(server.server_address, tuple): 1823 self.sock_hdlr = hcls('localhost', server.port) 1824 else: 1825 self.sock_hdlr = hcls(server.server_address, None) 1826 self.log_output = '' 1827 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1828 self.root_logger.addHandler(self.sock_hdlr) 1829 self.handled = threading.Event() 1830 1831 def tearDown(self): 1832 """Shutdown the UDP server.""" 1833 try: 1834 if self.server: 1835 self.server.stop() 1836 if self.sock_hdlr: 1837 self.root_logger.removeHandler(self.sock_hdlr) 1838 self.sock_hdlr.close() 1839 finally: 1840 BaseTest.tearDown(self) 1841 1842 def handle_datagram(self, request): 1843 slen = struct.pack('>L', 0) # length of prefix 1844 packet = request.packet[len(slen):] 1845 obj = pickle.loads(packet) 1846 record = logging.makeLogRecord(obj) 1847 self.log_output += record.msg + '\n' 1848 self.handled.set() 1849 1850 def test_output(self): 1851 # The log message sent to the DatagramHandler is properly received. 1852 if self.server_exception: 1853 self.skipTest(self.server_exception) 1854 logger = logging.getLogger("udp") 1855 logger.error("spam") 1856 self.handled.wait() 1857 self.handled.clear() 1858 logger.error("eggs") 1859 self.handled.wait() 1860 self.assertEqual(self.log_output, "spam\neggs\n") 1861 1862@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1863class UnixDatagramHandlerTest(DatagramHandlerTest): 1864 1865 """Test for DatagramHandler using Unix sockets.""" 1866 1867 if hasattr(socket, "AF_UNIX"): 1868 server_class = TestUnixDatagramServer 1869 1870 def setUp(self): 1871 # override the definition in the base class 1872 self.address = _get_temp_domain_socket() 1873 DatagramHandlerTest.setUp(self) 1874 1875 def tearDown(self): 1876 DatagramHandlerTest.tearDown(self) 1877 os_helper.unlink(self.address) 1878 1879class SysLogHandlerTest(BaseTest): 1880 1881 """Test for SysLogHandler using UDP.""" 1882 1883 server_class = TestUDPServer 1884 address = ('localhost', 0) 1885 1886 def setUp(self): 1887 """Set up a UDP server to receive log messages, and a SysLogHandler 1888 pointing to that server's address and port.""" 1889 BaseTest.setUp(self) 1890 # Issue #29177: deal with errors that happen during setup 1891 self.server = self.sl_hdlr = self.server_exception = None 1892 try: 1893 self.server = server = self.server_class(self.address, 1894 self.handle_datagram, 0.01) 1895 server.start() 1896 # Uncomment next line to test error recovery in setUp() 1897 # raise OSError('dummy error raised') 1898 except OSError as e: 1899 self.server_exception = e 1900 return 1901 server.ready.wait() 1902 hcls = logging.handlers.SysLogHandler 1903 if isinstance(server.server_address, tuple): 1904 self.sl_hdlr = hcls((server.server_address[0], server.port)) 1905 else: 1906 self.sl_hdlr = hcls(server.server_address) 1907 self.log_output = '' 1908 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1909 self.root_logger.addHandler(self.sl_hdlr) 1910 self.handled = threading.Event() 1911 1912 def tearDown(self): 1913 """Shutdown the server.""" 1914 try: 1915 if self.server: 1916 self.server.stop() 1917 if self.sl_hdlr: 1918 self.root_logger.removeHandler(self.sl_hdlr) 1919 self.sl_hdlr.close() 1920 finally: 1921 BaseTest.tearDown(self) 1922 1923 def handle_datagram(self, request): 1924 self.log_output = request.packet 1925 self.handled.set() 1926 1927 def test_output(self): 1928 if self.server_exception: 1929 self.skipTest(self.server_exception) 1930 # The log message sent to the SysLogHandler is properly received. 1931 logger = logging.getLogger("slh") 1932 logger.error("sp\xe4m") 1933 self.handled.wait() 1934 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 1935 self.handled.clear() 1936 self.sl_hdlr.append_nul = False 1937 logger.error("sp\xe4m") 1938 self.handled.wait() 1939 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m') 1940 self.handled.clear() 1941 self.sl_hdlr.ident = "h\xe4m-" 1942 logger.error("sp\xe4m") 1943 self.handled.wait() 1944 self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') 1945 1946@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1947class UnixSysLogHandlerTest(SysLogHandlerTest): 1948 1949 """Test for SysLogHandler with Unix sockets.""" 1950 1951 if hasattr(socket, "AF_UNIX"): 1952 server_class = TestUnixDatagramServer 1953 1954 def setUp(self): 1955 # override the definition in the base class 1956 self.address = _get_temp_domain_socket() 1957 SysLogHandlerTest.setUp(self) 1958 1959 def tearDown(self): 1960 SysLogHandlerTest.tearDown(self) 1961 os_helper.unlink(self.address) 1962 1963@unittest.skipUnless(socket_helper.IPV6_ENABLED, 1964 'IPv6 support required for this test.') 1965class IPv6SysLogHandlerTest(SysLogHandlerTest): 1966 1967 """Test for SysLogHandler with IPv6 host.""" 1968 1969 server_class = TestUDPServer 1970 address = ('::1', 0) 1971 1972 def setUp(self): 1973 self.server_class.address_family = socket.AF_INET6 1974 super(IPv6SysLogHandlerTest, self).setUp() 1975 1976 def tearDown(self): 1977 self.server_class.address_family = socket.AF_INET 1978 super(IPv6SysLogHandlerTest, self).tearDown() 1979 1980class HTTPHandlerTest(BaseTest): 1981 """Test for HTTPHandler.""" 1982 1983 def setUp(self): 1984 """Set up an HTTP server to receive log messages, and a HTTPHandler 1985 pointing to that server's address and port.""" 1986 BaseTest.setUp(self) 1987 self.handled = threading.Event() 1988 1989 def handle_request(self, request): 1990 self.command = request.command 1991 self.log_data = urlparse(request.path) 1992 if self.command == 'POST': 1993 try: 1994 rlen = int(request.headers['Content-Length']) 1995 self.post_data = request.rfile.read(rlen) 1996 except: 1997 self.post_data = None 1998 request.send_response(200) 1999 request.end_headers() 2000 self.handled.set() 2001 2002 def test_output(self): 2003 # The log message sent to the HTTPHandler is properly received. 2004 logger = logging.getLogger("http") 2005 root_logger = self.root_logger 2006 root_logger.removeHandler(self.root_logger.handlers[0]) 2007 for secure in (False, True): 2008 addr = ('localhost', 0) 2009 if secure: 2010 try: 2011 import ssl 2012 except ImportError: 2013 sslctx = None 2014 else: 2015 here = os.path.dirname(__file__) 2016 localhost_cert = os.path.join(here, "keycert.pem") 2017 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 2018 sslctx.load_cert_chain(localhost_cert) 2019 2020 context = ssl.create_default_context(cafile=localhost_cert) 2021 else: 2022 sslctx = None 2023 context = None 2024 self.server = server = TestHTTPServer(addr, self.handle_request, 2025 0.01, sslctx=sslctx) 2026 server.start() 2027 server.ready.wait() 2028 host = 'localhost:%d' % server.server_port 2029 secure_client = secure and sslctx 2030 self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', 2031 secure=secure_client, 2032 context=context, 2033 credentials=('foo', 'bar')) 2034 self.log_data = None 2035 root_logger.addHandler(self.h_hdlr) 2036 2037 for method in ('GET', 'POST'): 2038 self.h_hdlr.method = method 2039 self.handled.clear() 2040 msg = "sp\xe4m" 2041 logger.error(msg) 2042 self.handled.wait() 2043 self.assertEqual(self.log_data.path, '/frob') 2044 self.assertEqual(self.command, method) 2045 if method == 'GET': 2046 d = parse_qs(self.log_data.query) 2047 else: 2048 d = parse_qs(self.post_data.decode('utf-8')) 2049 self.assertEqual(d['name'], ['http']) 2050 self.assertEqual(d['funcName'], ['test_output']) 2051 self.assertEqual(d['msg'], [msg]) 2052 2053 self.server.stop() 2054 self.root_logger.removeHandler(self.h_hdlr) 2055 self.h_hdlr.close() 2056 2057class MemoryTest(BaseTest): 2058 2059 """Test memory persistence of logger objects.""" 2060 2061 def setUp(self): 2062 """Create a dict to remember potentially destroyed objects.""" 2063 BaseTest.setUp(self) 2064 self._survivors = {} 2065 2066 def _watch_for_survival(self, *args): 2067 """Watch the given objects for survival, by creating weakrefs to 2068 them.""" 2069 for obj in args: 2070 key = id(obj), repr(obj) 2071 self._survivors[key] = weakref.ref(obj) 2072 2073 def _assertTruesurvival(self): 2074 """Assert that all objects watched for survival have survived.""" 2075 # Trigger cycle breaking. 2076 gc.collect() 2077 dead = [] 2078 for (id_, repr_), ref in self._survivors.items(): 2079 if ref() is None: 2080 dead.append(repr_) 2081 if dead: 2082 self.fail("%d objects should have survived " 2083 "but have been destroyed: %s" % (len(dead), ", ".join(dead))) 2084 2085 def test_persistent_loggers(self): 2086 # Logger objects are persistent and retain their configuration, even 2087 # if visible references are destroyed. 2088 self.root_logger.setLevel(logging.INFO) 2089 foo = logging.getLogger("foo") 2090 self._watch_for_survival(foo) 2091 foo.setLevel(logging.DEBUG) 2092 self.root_logger.debug(self.next_message()) 2093 foo.debug(self.next_message()) 2094 self.assert_log_lines([ 2095 ('foo', 'DEBUG', '2'), 2096 ]) 2097 del foo 2098 # foo has survived. 2099 self._assertTruesurvival() 2100 # foo has retained its settings. 2101 bar = logging.getLogger("foo") 2102 bar.debug(self.next_message()) 2103 self.assert_log_lines([ 2104 ('foo', 'DEBUG', '2'), 2105 ('foo', 'DEBUG', '3'), 2106 ]) 2107 2108 2109class EncodingTest(BaseTest): 2110 def test_encoding_plain_file(self): 2111 # In Python 2.x, a plain file object is treated as having no encoding. 2112 log = logging.getLogger("test") 2113 fd, fn = tempfile.mkstemp(".log", "test_logging-1-") 2114 os.close(fd) 2115 # the non-ascii data we write to the log. 2116 data = "foo\x80" 2117 try: 2118 handler = logging.FileHandler(fn, encoding="utf-8") 2119 log.addHandler(handler) 2120 try: 2121 # write non-ascii data to the log. 2122 log.warning(data) 2123 finally: 2124 log.removeHandler(handler) 2125 handler.close() 2126 # check we wrote exactly those bytes, ignoring trailing \n etc 2127 f = open(fn, encoding="utf-8") 2128 try: 2129 self.assertEqual(f.read().rstrip(), data) 2130 finally: 2131 f.close() 2132 finally: 2133 if os.path.isfile(fn): 2134 os.remove(fn) 2135 2136 def test_encoding_cyrillic_unicode(self): 2137 log = logging.getLogger("test") 2138 # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) 2139 message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' 2140 # Ensure it's written in a Cyrillic encoding 2141 writer_class = codecs.getwriter('cp1251') 2142 writer_class.encoding = 'cp1251' 2143 stream = io.BytesIO() 2144 writer = writer_class(stream, 'strict') 2145 handler = logging.StreamHandler(writer) 2146 log.addHandler(handler) 2147 try: 2148 log.warning(message) 2149 finally: 2150 log.removeHandler(handler) 2151 handler.close() 2152 # check we wrote exactly those bytes, ignoring trailing \n etc 2153 s = stream.getvalue() 2154 # Compare against what the data should be when encoded in CP-1251 2155 self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') 2156 2157 2158class WarningsTest(BaseTest): 2159 2160 def test_warnings(self): 2161 with warnings.catch_warnings(): 2162 logging.captureWarnings(True) 2163 self.addCleanup(logging.captureWarnings, False) 2164 warnings.filterwarnings("always", category=UserWarning) 2165 stream = io.StringIO() 2166 h = logging.StreamHandler(stream) 2167 logger = logging.getLogger("py.warnings") 2168 logger.addHandler(h) 2169 warnings.warn("I'm warning you...") 2170 logger.removeHandler(h) 2171 s = stream.getvalue() 2172 h.close() 2173 self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) 2174 2175 # See if an explicit file uses the original implementation 2176 a_file = io.StringIO() 2177 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, 2178 a_file, "Dummy line") 2179 s = a_file.getvalue() 2180 a_file.close() 2181 self.assertEqual(s, 2182 "dummy.py:42: UserWarning: Explicit\n Dummy line\n") 2183 2184 def test_warnings_no_handlers(self): 2185 with warnings.catch_warnings(): 2186 logging.captureWarnings(True) 2187 self.addCleanup(logging.captureWarnings, False) 2188 2189 # confirm our assumption: no loggers are set 2190 logger = logging.getLogger("py.warnings") 2191 self.assertEqual(logger.handlers, []) 2192 2193 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42) 2194 self.assertEqual(len(logger.handlers), 1) 2195 self.assertIsInstance(logger.handlers[0], logging.NullHandler) 2196 2197 2198def formatFunc(format, datefmt=None): 2199 return logging.Formatter(format, datefmt) 2200 2201class myCustomFormatter: 2202 def __init__(self, fmt, datefmt=None): 2203 pass 2204 2205def handlerFunc(): 2206 return logging.StreamHandler() 2207 2208class CustomHandler(logging.StreamHandler): 2209 pass 2210 2211class ConfigDictTest(BaseTest): 2212 2213 """Reading logging config from a dictionary.""" 2214 2215 check_no_resource_warning = warnings_helper.check_no_resource_warning 2216 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 2217 2218 # config0 is a standard configuration. 2219 config0 = { 2220 'version': 1, 2221 'formatters': { 2222 'form1' : { 2223 'format' : '%(levelname)s ++ %(message)s', 2224 }, 2225 }, 2226 'handlers' : { 2227 'hand1' : { 2228 'class' : 'logging.StreamHandler', 2229 'formatter' : 'form1', 2230 'level' : 'NOTSET', 2231 'stream' : 'ext://sys.stdout', 2232 }, 2233 }, 2234 'root' : { 2235 'level' : 'WARNING', 2236 'handlers' : ['hand1'], 2237 }, 2238 } 2239 2240 # config1 adds a little to the standard configuration. 2241 config1 = { 2242 'version': 1, 2243 'formatters': { 2244 'form1' : { 2245 'format' : '%(levelname)s ++ %(message)s', 2246 }, 2247 }, 2248 'handlers' : { 2249 'hand1' : { 2250 'class' : 'logging.StreamHandler', 2251 'formatter' : 'form1', 2252 'level' : 'NOTSET', 2253 'stream' : 'ext://sys.stdout', 2254 }, 2255 }, 2256 'loggers' : { 2257 'compiler.parser' : { 2258 'level' : 'DEBUG', 2259 'handlers' : ['hand1'], 2260 }, 2261 }, 2262 'root' : { 2263 'level' : 'WARNING', 2264 }, 2265 } 2266 2267 # config1a moves the handler to the root. Used with config8a 2268 config1a = { 2269 'version': 1, 2270 'formatters': { 2271 'form1' : { 2272 'format' : '%(levelname)s ++ %(message)s', 2273 }, 2274 }, 2275 'handlers' : { 2276 'hand1' : { 2277 'class' : 'logging.StreamHandler', 2278 'formatter' : 'form1', 2279 'level' : 'NOTSET', 2280 'stream' : 'ext://sys.stdout', 2281 }, 2282 }, 2283 'loggers' : { 2284 'compiler.parser' : { 2285 'level' : 'DEBUG', 2286 }, 2287 }, 2288 'root' : { 2289 'level' : 'WARNING', 2290 'handlers' : ['hand1'], 2291 }, 2292 } 2293 2294 # config2 has a subtle configuration error that should be reported 2295 config2 = { 2296 'version': 1, 2297 'formatters': { 2298 'form1' : { 2299 'format' : '%(levelname)s ++ %(message)s', 2300 }, 2301 }, 2302 'handlers' : { 2303 'hand1' : { 2304 'class' : 'logging.StreamHandler', 2305 'formatter' : 'form1', 2306 'level' : 'NOTSET', 2307 'stream' : 'ext://sys.stdbout', 2308 }, 2309 }, 2310 'loggers' : { 2311 'compiler.parser' : { 2312 'level' : 'DEBUG', 2313 'handlers' : ['hand1'], 2314 }, 2315 }, 2316 'root' : { 2317 'level' : 'WARNING', 2318 }, 2319 } 2320 2321 # As config1 but with a misspelt level on a handler 2322 config2a = { 2323 'version': 1, 2324 'formatters': { 2325 'form1' : { 2326 'format' : '%(levelname)s ++ %(message)s', 2327 }, 2328 }, 2329 'handlers' : { 2330 'hand1' : { 2331 'class' : 'logging.StreamHandler', 2332 'formatter' : 'form1', 2333 'level' : 'NTOSET', 2334 'stream' : 'ext://sys.stdout', 2335 }, 2336 }, 2337 'loggers' : { 2338 'compiler.parser' : { 2339 'level' : 'DEBUG', 2340 'handlers' : ['hand1'], 2341 }, 2342 }, 2343 'root' : { 2344 'level' : 'WARNING', 2345 }, 2346 } 2347 2348 2349 # As config1 but with a misspelt level on a logger 2350 config2b = { 2351 'version': 1, 2352 'formatters': { 2353 'form1' : { 2354 'format' : '%(levelname)s ++ %(message)s', 2355 }, 2356 }, 2357 'handlers' : { 2358 'hand1' : { 2359 'class' : 'logging.StreamHandler', 2360 'formatter' : 'form1', 2361 'level' : 'NOTSET', 2362 'stream' : 'ext://sys.stdout', 2363 }, 2364 }, 2365 'loggers' : { 2366 'compiler.parser' : { 2367 'level' : 'DEBUG', 2368 'handlers' : ['hand1'], 2369 }, 2370 }, 2371 'root' : { 2372 'level' : 'WRANING', 2373 }, 2374 } 2375 2376 # config3 has a less subtle configuration error 2377 config3 = { 2378 'version': 1, 2379 'formatters': { 2380 'form1' : { 2381 'format' : '%(levelname)s ++ %(message)s', 2382 }, 2383 }, 2384 'handlers' : { 2385 'hand1' : { 2386 'class' : 'logging.StreamHandler', 2387 'formatter' : 'misspelled_name', 2388 'level' : 'NOTSET', 2389 'stream' : 'ext://sys.stdout', 2390 }, 2391 }, 2392 'loggers' : { 2393 'compiler.parser' : { 2394 'level' : 'DEBUG', 2395 'handlers' : ['hand1'], 2396 }, 2397 }, 2398 'root' : { 2399 'level' : 'WARNING', 2400 }, 2401 } 2402 2403 # config4 specifies a custom formatter class to be loaded 2404 config4 = { 2405 'version': 1, 2406 'formatters': { 2407 'form1' : { 2408 '()' : __name__ + '.ExceptionFormatter', 2409 'format' : '%(levelname)s:%(name)s:%(message)s', 2410 }, 2411 }, 2412 'handlers' : { 2413 'hand1' : { 2414 'class' : 'logging.StreamHandler', 2415 'formatter' : 'form1', 2416 'level' : 'NOTSET', 2417 'stream' : 'ext://sys.stdout', 2418 }, 2419 }, 2420 'root' : { 2421 'level' : 'NOTSET', 2422 'handlers' : ['hand1'], 2423 }, 2424 } 2425 2426 # As config4 but using an actual callable rather than a string 2427 config4a = { 2428 'version': 1, 2429 'formatters': { 2430 'form1' : { 2431 '()' : ExceptionFormatter, 2432 'format' : '%(levelname)s:%(name)s:%(message)s', 2433 }, 2434 'form2' : { 2435 '()' : __name__ + '.formatFunc', 2436 'format' : '%(levelname)s:%(name)s:%(message)s', 2437 }, 2438 'form3' : { 2439 '()' : formatFunc, 2440 'format' : '%(levelname)s:%(name)s:%(message)s', 2441 }, 2442 }, 2443 'handlers' : { 2444 'hand1' : { 2445 'class' : 'logging.StreamHandler', 2446 'formatter' : 'form1', 2447 'level' : 'NOTSET', 2448 'stream' : 'ext://sys.stdout', 2449 }, 2450 'hand2' : { 2451 '()' : handlerFunc, 2452 }, 2453 }, 2454 'root' : { 2455 'level' : 'NOTSET', 2456 'handlers' : ['hand1'], 2457 }, 2458 } 2459 2460 # config5 specifies a custom handler class to be loaded 2461 config5 = { 2462 'version': 1, 2463 'formatters': { 2464 'form1' : { 2465 'format' : '%(levelname)s ++ %(message)s', 2466 }, 2467 }, 2468 'handlers' : { 2469 'hand1' : { 2470 'class' : __name__ + '.CustomHandler', 2471 'formatter' : 'form1', 2472 'level' : 'NOTSET', 2473 'stream' : 'ext://sys.stdout', 2474 }, 2475 }, 2476 'loggers' : { 2477 'compiler.parser' : { 2478 'level' : 'DEBUG', 2479 'handlers' : ['hand1'], 2480 }, 2481 }, 2482 'root' : { 2483 'level' : 'WARNING', 2484 }, 2485 } 2486 2487 # config6 specifies a custom handler class to be loaded 2488 # but has bad arguments 2489 config6 = { 2490 'version': 1, 2491 'formatters': { 2492 'form1' : { 2493 'format' : '%(levelname)s ++ %(message)s', 2494 }, 2495 }, 2496 'handlers' : { 2497 'hand1' : { 2498 'class' : __name__ + '.CustomHandler', 2499 'formatter' : 'form1', 2500 'level' : 'NOTSET', 2501 'stream' : 'ext://sys.stdout', 2502 '9' : 'invalid parameter name', 2503 }, 2504 }, 2505 'loggers' : { 2506 'compiler.parser' : { 2507 'level' : 'DEBUG', 2508 'handlers' : ['hand1'], 2509 }, 2510 }, 2511 'root' : { 2512 'level' : 'WARNING', 2513 }, 2514 } 2515 2516 # config 7 does not define compiler.parser but defines compiler.lexer 2517 # so compiler.parser should be disabled after applying it 2518 config7 = { 2519 'version': 1, 2520 'formatters': { 2521 'form1' : { 2522 'format' : '%(levelname)s ++ %(message)s', 2523 }, 2524 }, 2525 'handlers' : { 2526 'hand1' : { 2527 'class' : 'logging.StreamHandler', 2528 'formatter' : 'form1', 2529 'level' : 'NOTSET', 2530 'stream' : 'ext://sys.stdout', 2531 }, 2532 }, 2533 'loggers' : { 2534 'compiler.lexer' : { 2535 'level' : 'DEBUG', 2536 'handlers' : ['hand1'], 2537 }, 2538 }, 2539 'root' : { 2540 'level' : 'WARNING', 2541 }, 2542 } 2543 2544 # config8 defines both compiler and compiler.lexer 2545 # so compiler.parser should not be disabled (since 2546 # compiler is defined) 2547 config8 = { 2548 'version': 1, 2549 'disable_existing_loggers' : False, 2550 'formatters': { 2551 'form1' : { 2552 'format' : '%(levelname)s ++ %(message)s', 2553 }, 2554 }, 2555 'handlers' : { 2556 'hand1' : { 2557 'class' : 'logging.StreamHandler', 2558 'formatter' : 'form1', 2559 'level' : 'NOTSET', 2560 'stream' : 'ext://sys.stdout', 2561 }, 2562 }, 2563 'loggers' : { 2564 'compiler' : { 2565 'level' : 'DEBUG', 2566 'handlers' : ['hand1'], 2567 }, 2568 'compiler.lexer' : { 2569 }, 2570 }, 2571 'root' : { 2572 'level' : 'WARNING', 2573 }, 2574 } 2575 2576 # config8a disables existing loggers 2577 config8a = { 2578 'version': 1, 2579 'disable_existing_loggers' : True, 2580 'formatters': { 2581 'form1' : { 2582 'format' : '%(levelname)s ++ %(message)s', 2583 }, 2584 }, 2585 'handlers' : { 2586 'hand1' : { 2587 'class' : 'logging.StreamHandler', 2588 'formatter' : 'form1', 2589 'level' : 'NOTSET', 2590 'stream' : 'ext://sys.stdout', 2591 }, 2592 }, 2593 'loggers' : { 2594 'compiler' : { 2595 'level' : 'DEBUG', 2596 'handlers' : ['hand1'], 2597 }, 2598 'compiler.lexer' : { 2599 }, 2600 }, 2601 'root' : { 2602 'level' : 'WARNING', 2603 }, 2604 } 2605 2606 config9 = { 2607 'version': 1, 2608 'formatters': { 2609 'form1' : { 2610 'format' : '%(levelname)s ++ %(message)s', 2611 }, 2612 }, 2613 'handlers' : { 2614 'hand1' : { 2615 'class' : 'logging.StreamHandler', 2616 'formatter' : 'form1', 2617 'level' : 'WARNING', 2618 'stream' : 'ext://sys.stdout', 2619 }, 2620 }, 2621 'loggers' : { 2622 'compiler.parser' : { 2623 'level' : 'WARNING', 2624 'handlers' : ['hand1'], 2625 }, 2626 }, 2627 'root' : { 2628 'level' : 'NOTSET', 2629 }, 2630 } 2631 2632 config9a = { 2633 'version': 1, 2634 'incremental' : True, 2635 'handlers' : { 2636 'hand1' : { 2637 'level' : 'WARNING', 2638 }, 2639 }, 2640 'loggers' : { 2641 'compiler.parser' : { 2642 'level' : 'INFO', 2643 }, 2644 }, 2645 } 2646 2647 config9b = { 2648 'version': 1, 2649 'incremental' : True, 2650 'handlers' : { 2651 'hand1' : { 2652 'level' : 'INFO', 2653 }, 2654 }, 2655 'loggers' : { 2656 'compiler.parser' : { 2657 'level' : 'INFO', 2658 }, 2659 }, 2660 } 2661 2662 # As config1 but with a filter added 2663 config10 = { 2664 'version': 1, 2665 'formatters': { 2666 'form1' : { 2667 'format' : '%(levelname)s ++ %(message)s', 2668 }, 2669 }, 2670 'filters' : { 2671 'filt1' : { 2672 'name' : 'compiler.parser', 2673 }, 2674 }, 2675 'handlers' : { 2676 'hand1' : { 2677 'class' : 'logging.StreamHandler', 2678 'formatter' : 'form1', 2679 'level' : 'NOTSET', 2680 'stream' : 'ext://sys.stdout', 2681 'filters' : ['filt1'], 2682 }, 2683 }, 2684 'loggers' : { 2685 'compiler.parser' : { 2686 'level' : 'DEBUG', 2687 'filters' : ['filt1'], 2688 }, 2689 }, 2690 'root' : { 2691 'level' : 'WARNING', 2692 'handlers' : ['hand1'], 2693 }, 2694 } 2695 2696 # As config1 but using cfg:// references 2697 config11 = { 2698 'version': 1, 2699 'true_formatters': { 2700 'form1' : { 2701 'format' : '%(levelname)s ++ %(message)s', 2702 }, 2703 }, 2704 'handler_configs': { 2705 'hand1' : { 2706 'class' : 'logging.StreamHandler', 2707 'formatter' : 'form1', 2708 'level' : 'NOTSET', 2709 'stream' : 'ext://sys.stdout', 2710 }, 2711 }, 2712 'formatters' : 'cfg://true_formatters', 2713 'handlers' : { 2714 'hand1' : 'cfg://handler_configs[hand1]', 2715 }, 2716 'loggers' : { 2717 'compiler.parser' : { 2718 'level' : 'DEBUG', 2719 'handlers' : ['hand1'], 2720 }, 2721 }, 2722 'root' : { 2723 'level' : 'WARNING', 2724 }, 2725 } 2726 2727 # As config11 but missing the version key 2728 config12 = { 2729 'true_formatters': { 2730 'form1' : { 2731 'format' : '%(levelname)s ++ %(message)s', 2732 }, 2733 }, 2734 'handler_configs': { 2735 'hand1' : { 2736 'class' : 'logging.StreamHandler', 2737 'formatter' : 'form1', 2738 'level' : 'NOTSET', 2739 'stream' : 'ext://sys.stdout', 2740 }, 2741 }, 2742 'formatters' : 'cfg://true_formatters', 2743 'handlers' : { 2744 'hand1' : 'cfg://handler_configs[hand1]', 2745 }, 2746 'loggers' : { 2747 'compiler.parser' : { 2748 'level' : 'DEBUG', 2749 'handlers' : ['hand1'], 2750 }, 2751 }, 2752 'root' : { 2753 'level' : 'WARNING', 2754 }, 2755 } 2756 2757 # As config11 but using an unsupported version 2758 config13 = { 2759 'version': 2, 2760 'true_formatters': { 2761 'form1' : { 2762 'format' : '%(levelname)s ++ %(message)s', 2763 }, 2764 }, 2765 'handler_configs': { 2766 'hand1' : { 2767 'class' : 'logging.StreamHandler', 2768 'formatter' : 'form1', 2769 'level' : 'NOTSET', 2770 'stream' : 'ext://sys.stdout', 2771 }, 2772 }, 2773 'formatters' : 'cfg://true_formatters', 2774 'handlers' : { 2775 'hand1' : 'cfg://handler_configs[hand1]', 2776 }, 2777 'loggers' : { 2778 'compiler.parser' : { 2779 'level' : 'DEBUG', 2780 'handlers' : ['hand1'], 2781 }, 2782 }, 2783 'root' : { 2784 'level' : 'WARNING', 2785 }, 2786 } 2787 2788 # As config0, but with properties 2789 config14 = { 2790 'version': 1, 2791 'formatters': { 2792 'form1' : { 2793 'format' : '%(levelname)s ++ %(message)s', 2794 }, 2795 }, 2796 'handlers' : { 2797 'hand1' : { 2798 'class' : 'logging.StreamHandler', 2799 'formatter' : 'form1', 2800 'level' : 'NOTSET', 2801 'stream' : 'ext://sys.stdout', 2802 '.': { 2803 'foo': 'bar', 2804 'terminator': '!\n', 2805 } 2806 }, 2807 }, 2808 'root' : { 2809 'level' : 'WARNING', 2810 'handlers' : ['hand1'], 2811 }, 2812 } 2813 2814 out_of_order = { 2815 "version": 1, 2816 "formatters": { 2817 "mySimpleFormatter": { 2818 "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s", 2819 "style": "$" 2820 } 2821 }, 2822 "handlers": { 2823 "fileGlobal": { 2824 "class": "logging.StreamHandler", 2825 "level": "DEBUG", 2826 "formatter": "mySimpleFormatter" 2827 }, 2828 "bufferGlobal": { 2829 "class": "logging.handlers.MemoryHandler", 2830 "capacity": 5, 2831 "formatter": "mySimpleFormatter", 2832 "target": "fileGlobal", 2833 "level": "DEBUG" 2834 } 2835 }, 2836 "loggers": { 2837 "mymodule": { 2838 "level": "DEBUG", 2839 "handlers": ["bufferGlobal"], 2840 "propagate": "true" 2841 } 2842 } 2843 } 2844 2845 # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False 2846 custom_formatter_class_validate = { 2847 'version': 1, 2848 'formatters': { 2849 'form1': { 2850 '()': __name__ + '.ExceptionFormatter', 2851 'format': '%(levelname)s:%(name)s:%(message)s', 2852 'validate': False, 2853 }, 2854 }, 2855 'handlers' : { 2856 'hand1' : { 2857 'class': 'logging.StreamHandler', 2858 'formatter': 'form1', 2859 'level': 'NOTSET', 2860 'stream': 'ext://sys.stdout', 2861 }, 2862 }, 2863 "loggers": { 2864 "my_test_logger_custom_formatter": { 2865 "level": "DEBUG", 2866 "handlers": ["hand1"], 2867 "propagate": "true" 2868 } 2869 } 2870 } 2871 2872 # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False 2873 custom_formatter_class_validate2 = { 2874 'version': 1, 2875 'formatters': { 2876 'form1': { 2877 'class': __name__ + '.ExceptionFormatter', 2878 'format': '%(levelname)s:%(name)s:%(message)s', 2879 'validate': False, 2880 }, 2881 }, 2882 'handlers' : { 2883 'hand1' : { 2884 'class': 'logging.StreamHandler', 2885 'formatter': 'form1', 2886 'level': 'NOTSET', 2887 'stream': 'ext://sys.stdout', 2888 }, 2889 }, 2890 "loggers": { 2891 "my_test_logger_custom_formatter": { 2892 "level": "DEBUG", 2893 "handlers": ["hand1"], 2894 "propagate": "true" 2895 } 2896 } 2897 } 2898 2899 # Configuration with custom class that is not inherited from logging.Formatter 2900 custom_formatter_class_validate3 = { 2901 'version': 1, 2902 'formatters': { 2903 'form1': { 2904 'class': __name__ + '.myCustomFormatter', 2905 'format': '%(levelname)s:%(name)s:%(message)s', 2906 'validate': False, 2907 }, 2908 }, 2909 'handlers' : { 2910 'hand1' : { 2911 'class': 'logging.StreamHandler', 2912 'formatter': 'form1', 2913 'level': 'NOTSET', 2914 'stream': 'ext://sys.stdout', 2915 }, 2916 }, 2917 "loggers": { 2918 "my_test_logger_custom_formatter": { 2919 "level": "DEBUG", 2920 "handlers": ["hand1"], 2921 "propagate": "true" 2922 } 2923 } 2924 } 2925 2926 # Configuration with custom function and 'validate' set to False 2927 custom_formatter_with_function = { 2928 'version': 1, 2929 'formatters': { 2930 'form1': { 2931 '()': formatFunc, 2932 'format': '%(levelname)s:%(name)s:%(message)s', 2933 'validate': False, 2934 }, 2935 }, 2936 'handlers' : { 2937 'hand1' : { 2938 'class': 'logging.StreamHandler', 2939 'formatter': 'form1', 2940 'level': 'NOTSET', 2941 'stream': 'ext://sys.stdout', 2942 }, 2943 }, 2944 "loggers": { 2945 "my_test_logger_custom_formatter": { 2946 "level": "DEBUG", 2947 "handlers": ["hand1"], 2948 "propagate": "true" 2949 } 2950 } 2951 } 2952 2953 def apply_config(self, conf): 2954 logging.config.dictConfig(conf) 2955 2956 def test_config0_ok(self): 2957 # A simple config which overrides the default settings. 2958 with support.captured_stdout() as output: 2959 self.apply_config(self.config0) 2960 logger = logging.getLogger() 2961 # Won't output anything 2962 logger.info(self.next_message()) 2963 # Outputs a message 2964 logger.error(self.next_message()) 2965 self.assert_log_lines([ 2966 ('ERROR', '2'), 2967 ], stream=output) 2968 # Original logger output is empty. 2969 self.assert_log_lines([]) 2970 2971 def test_config1_ok(self, config=config1): 2972 # A config defining a sub-parser as well. 2973 with support.captured_stdout() as output: 2974 self.apply_config(config) 2975 logger = logging.getLogger("compiler.parser") 2976 # Both will output a message 2977 logger.info(self.next_message()) 2978 logger.error(self.next_message()) 2979 self.assert_log_lines([ 2980 ('INFO', '1'), 2981 ('ERROR', '2'), 2982 ], stream=output) 2983 # Original logger output is empty. 2984 self.assert_log_lines([]) 2985 2986 def test_config2_failure(self): 2987 # A simple config which overrides the default settings. 2988 self.assertRaises(Exception, self.apply_config, self.config2) 2989 2990 def test_config2a_failure(self): 2991 # A simple config which overrides the default settings. 2992 self.assertRaises(Exception, self.apply_config, self.config2a) 2993 2994 def test_config2b_failure(self): 2995 # A simple config which overrides the default settings. 2996 self.assertRaises(Exception, self.apply_config, self.config2b) 2997 2998 def test_config3_failure(self): 2999 # A simple config which overrides the default settings. 3000 self.assertRaises(Exception, self.apply_config, self.config3) 3001 3002 def test_config4_ok(self): 3003 # A config specifying a custom formatter class. 3004 with support.captured_stdout() as output: 3005 self.apply_config(self.config4) 3006 #logger = logging.getLogger() 3007 try: 3008 raise RuntimeError() 3009 except RuntimeError: 3010 logging.exception("just testing") 3011 sys.stdout.seek(0) 3012 self.assertEqual(output.getvalue(), 3013 "ERROR:root:just testing\nGot a [RuntimeError]\n") 3014 # Original logger output is empty 3015 self.assert_log_lines([]) 3016 3017 def test_config4a_ok(self): 3018 # A config specifying a custom formatter class. 3019 with support.captured_stdout() as output: 3020 self.apply_config(self.config4a) 3021 #logger = logging.getLogger() 3022 try: 3023 raise RuntimeError() 3024 except RuntimeError: 3025 logging.exception("just testing") 3026 sys.stdout.seek(0) 3027 self.assertEqual(output.getvalue(), 3028 "ERROR:root:just testing\nGot a [RuntimeError]\n") 3029 # Original logger output is empty 3030 self.assert_log_lines([]) 3031 3032 def test_config5_ok(self): 3033 self.test_config1_ok(config=self.config5) 3034 3035 def test_config6_failure(self): 3036 self.assertRaises(Exception, self.apply_config, self.config6) 3037 3038 def test_config7_ok(self): 3039 with support.captured_stdout() as output: 3040 self.apply_config(self.config1) 3041 logger = logging.getLogger("compiler.parser") 3042 # Both will output a message 3043 logger.info(self.next_message()) 3044 logger.error(self.next_message()) 3045 self.assert_log_lines([ 3046 ('INFO', '1'), 3047 ('ERROR', '2'), 3048 ], stream=output) 3049 # Original logger output is empty. 3050 self.assert_log_lines([]) 3051 with support.captured_stdout() as output: 3052 self.apply_config(self.config7) 3053 logger = logging.getLogger("compiler.parser") 3054 self.assertTrue(logger.disabled) 3055 logger = logging.getLogger("compiler.lexer") 3056 # Both will output a message 3057 logger.info(self.next_message()) 3058 logger.error(self.next_message()) 3059 self.assert_log_lines([ 3060 ('INFO', '3'), 3061 ('ERROR', '4'), 3062 ], stream=output) 3063 # Original logger output is empty. 3064 self.assert_log_lines([]) 3065 3066 # Same as test_config_7_ok but don't disable old loggers. 3067 def test_config_8_ok(self): 3068 with support.captured_stdout() as output: 3069 self.apply_config(self.config1) 3070 logger = logging.getLogger("compiler.parser") 3071 # All will output a message 3072 logger.info(self.next_message()) 3073 logger.error(self.next_message()) 3074 self.assert_log_lines([ 3075 ('INFO', '1'), 3076 ('ERROR', '2'), 3077 ], stream=output) 3078 # Original logger output is empty. 3079 self.assert_log_lines([]) 3080 with support.captured_stdout() as output: 3081 self.apply_config(self.config8) 3082 logger = logging.getLogger("compiler.parser") 3083 self.assertFalse(logger.disabled) 3084 # Both will output a message 3085 logger.info(self.next_message()) 3086 logger.error(self.next_message()) 3087 logger = logging.getLogger("compiler.lexer") 3088 # Both will output a message 3089 logger.info(self.next_message()) 3090 logger.error(self.next_message()) 3091 self.assert_log_lines([ 3092 ('INFO', '3'), 3093 ('ERROR', '4'), 3094 ('INFO', '5'), 3095 ('ERROR', '6'), 3096 ], stream=output) 3097 # Original logger output is empty. 3098 self.assert_log_lines([]) 3099 3100 def test_config_8a_ok(self): 3101 with support.captured_stdout() as output: 3102 self.apply_config(self.config1a) 3103 logger = logging.getLogger("compiler.parser") 3104 # See issue #11424. compiler-hyphenated sorts 3105 # between compiler and compiler.xyz and this 3106 # was preventing compiler.xyz from being included 3107 # in the child loggers of compiler because of an 3108 # overzealous loop termination condition. 3109 hyphenated = logging.getLogger('compiler-hyphenated') 3110 # All will output a message 3111 logger.info(self.next_message()) 3112 logger.error(self.next_message()) 3113 hyphenated.critical(self.next_message()) 3114 self.assert_log_lines([ 3115 ('INFO', '1'), 3116 ('ERROR', '2'), 3117 ('CRITICAL', '3'), 3118 ], stream=output) 3119 # Original logger output is empty. 3120 self.assert_log_lines([]) 3121 with support.captured_stdout() as output: 3122 self.apply_config(self.config8a) 3123 logger = logging.getLogger("compiler.parser") 3124 self.assertFalse(logger.disabled) 3125 # Both will output a message 3126 logger.info(self.next_message()) 3127 logger.error(self.next_message()) 3128 logger = logging.getLogger("compiler.lexer") 3129 # Both will output a message 3130 logger.info(self.next_message()) 3131 logger.error(self.next_message()) 3132 # Will not appear 3133 hyphenated.critical(self.next_message()) 3134 self.assert_log_lines([ 3135 ('INFO', '4'), 3136 ('ERROR', '5'), 3137 ('INFO', '6'), 3138 ('ERROR', '7'), 3139 ], stream=output) 3140 # Original logger output is empty. 3141 self.assert_log_lines([]) 3142 3143 def test_config_9_ok(self): 3144 with support.captured_stdout() as output: 3145 self.apply_config(self.config9) 3146 logger = logging.getLogger("compiler.parser") 3147 # Nothing will be output since both handler and logger are set to WARNING 3148 logger.info(self.next_message()) 3149 self.assert_log_lines([], stream=output) 3150 self.apply_config(self.config9a) 3151 # Nothing will be output since handler is still set to WARNING 3152 logger.info(self.next_message()) 3153 self.assert_log_lines([], stream=output) 3154 self.apply_config(self.config9b) 3155 # Message should now be output 3156 logger.info(self.next_message()) 3157 self.assert_log_lines([ 3158 ('INFO', '3'), 3159 ], stream=output) 3160 3161 def test_config_10_ok(self): 3162 with support.captured_stdout() as output: 3163 self.apply_config(self.config10) 3164 logger = logging.getLogger("compiler.parser") 3165 logger.warning(self.next_message()) 3166 logger = logging.getLogger('compiler') 3167 # Not output, because filtered 3168 logger.warning(self.next_message()) 3169 logger = logging.getLogger('compiler.lexer') 3170 # Not output, because filtered 3171 logger.warning(self.next_message()) 3172 logger = logging.getLogger("compiler.parser.codegen") 3173 # Output, as not filtered 3174 logger.error(self.next_message()) 3175 self.assert_log_lines([ 3176 ('WARNING', '1'), 3177 ('ERROR', '4'), 3178 ], stream=output) 3179 3180 def test_config11_ok(self): 3181 self.test_config1_ok(self.config11) 3182 3183 def test_config12_failure(self): 3184 self.assertRaises(Exception, self.apply_config, self.config12) 3185 3186 def test_config13_failure(self): 3187 self.assertRaises(Exception, self.apply_config, self.config13) 3188 3189 def test_config14_ok(self): 3190 with support.captured_stdout() as output: 3191 self.apply_config(self.config14) 3192 h = logging._handlers['hand1'] 3193 self.assertEqual(h.foo, 'bar') 3194 self.assertEqual(h.terminator, '!\n') 3195 logging.warning('Exclamation') 3196 self.assertTrue(output.getvalue().endswith('Exclamation!\n')) 3197 3198 def test_config15_ok(self): 3199 3200 def cleanup(h1, fn): 3201 h1.close() 3202 os.remove(fn) 3203 3204 with self.check_no_resource_warning(): 3205 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 3206 os.close(fd) 3207 3208 config = { 3209 "version": 1, 3210 "handlers": { 3211 "file": { 3212 "class": "logging.FileHandler", 3213 "filename": fn, 3214 "encoding": "utf-8", 3215 } 3216 }, 3217 "root": { 3218 "handlers": ["file"] 3219 } 3220 } 3221 3222 self.apply_config(config) 3223 self.apply_config(config) 3224 3225 handler = logging.root.handlers[0] 3226 self.addCleanup(cleanup, handler, fn) 3227 3228 def setup_via_listener(self, text, verify=None): 3229 text = text.encode("utf-8") 3230 # Ask for a randomly assigned port (by using port 0) 3231 t = logging.config.listen(0, verify) 3232 t.start() 3233 t.ready.wait() 3234 # Now get the port allocated 3235 port = t.port 3236 t.ready.clear() 3237 try: 3238 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 3239 sock.settimeout(2.0) 3240 sock.connect(('localhost', port)) 3241 3242 slen = struct.pack('>L', len(text)) 3243 s = slen + text 3244 sentsofar = 0 3245 left = len(s) 3246 while left > 0: 3247 sent = sock.send(s[sentsofar:]) 3248 sentsofar += sent 3249 left -= sent 3250 sock.close() 3251 finally: 3252 t.ready.wait(2.0) 3253 logging.config.stopListening() 3254 threading_helper.join_thread(t) 3255 3256 def test_listen_config_10_ok(self): 3257 with support.captured_stdout() as output: 3258 self.setup_via_listener(json.dumps(self.config10)) 3259 logger = logging.getLogger("compiler.parser") 3260 logger.warning(self.next_message()) 3261 logger = logging.getLogger('compiler') 3262 # Not output, because filtered 3263 logger.warning(self.next_message()) 3264 logger = logging.getLogger('compiler.lexer') 3265 # Not output, because filtered 3266 logger.warning(self.next_message()) 3267 logger = logging.getLogger("compiler.parser.codegen") 3268 # Output, as not filtered 3269 logger.error(self.next_message()) 3270 self.assert_log_lines([ 3271 ('WARNING', '1'), 3272 ('ERROR', '4'), 3273 ], stream=output) 3274 3275 def test_listen_config_1_ok(self): 3276 with support.captured_stdout() as output: 3277 self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) 3278 logger = logging.getLogger("compiler.parser") 3279 # Both will output a message 3280 logger.info(self.next_message()) 3281 logger.error(self.next_message()) 3282 self.assert_log_lines([ 3283 ('INFO', '1'), 3284 ('ERROR', '2'), 3285 ], stream=output) 3286 # Original logger output is empty. 3287 self.assert_log_lines([]) 3288 3289 def test_listen_verify(self): 3290 3291 def verify_fail(stuff): 3292 return None 3293 3294 def verify_reverse(stuff): 3295 return stuff[::-1] 3296 3297 logger = logging.getLogger("compiler.parser") 3298 to_send = textwrap.dedent(ConfigFileTest.config1) 3299 # First, specify a verification function that will fail. 3300 # We expect to see no output, since our configuration 3301 # never took effect. 3302 with support.captured_stdout() as output: 3303 self.setup_via_listener(to_send, verify_fail) 3304 # Both will output a message 3305 logger.info(self.next_message()) 3306 logger.error(self.next_message()) 3307 self.assert_log_lines([], stream=output) 3308 # Original logger output has the stuff we logged. 3309 self.assert_log_lines([ 3310 ('INFO', '1'), 3311 ('ERROR', '2'), 3312 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3313 3314 # Now, perform no verification. Our configuration 3315 # should take effect. 3316 3317 with support.captured_stdout() as output: 3318 self.setup_via_listener(to_send) # no verify callable specified 3319 logger = logging.getLogger("compiler.parser") 3320 # Both will output a message 3321 logger.info(self.next_message()) 3322 logger.error(self.next_message()) 3323 self.assert_log_lines([ 3324 ('INFO', '3'), 3325 ('ERROR', '4'), 3326 ], stream=output) 3327 # Original logger output still has the stuff we logged before. 3328 self.assert_log_lines([ 3329 ('INFO', '1'), 3330 ('ERROR', '2'), 3331 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3332 3333 # Now, perform verification which transforms the bytes. 3334 3335 with support.captured_stdout() as output: 3336 self.setup_via_listener(to_send[::-1], verify_reverse) 3337 logger = logging.getLogger("compiler.parser") 3338 # Both will output a message 3339 logger.info(self.next_message()) 3340 logger.error(self.next_message()) 3341 self.assert_log_lines([ 3342 ('INFO', '5'), 3343 ('ERROR', '6'), 3344 ], stream=output) 3345 # Original logger output still has the stuff we logged before. 3346 self.assert_log_lines([ 3347 ('INFO', '1'), 3348 ('ERROR', '2'), 3349 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3350 3351 def test_out_of_order(self): 3352 self.assertRaises(ValueError, self.apply_config, self.out_of_order) 3353 3354 def test_out_of_order_with_dollar_style(self): 3355 config = copy.deepcopy(self.out_of_order) 3356 config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}" 3357 3358 self.apply_config(config) 3359 handler = logging.getLogger('mymodule').handlers[0] 3360 self.assertIsInstance(handler.target, logging.Handler) 3361 self.assertIsInstance(handler.formatter._style, 3362 logging.StringTemplateStyle) 3363 3364 def test_custom_formatter_class_with_validate(self): 3365 self.apply_config(self.custom_formatter_class_validate) 3366 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3367 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3368 3369 def test_custom_formatter_class_with_validate2(self): 3370 self.apply_config(self.custom_formatter_class_validate2) 3371 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3372 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3373 3374 def test_custom_formatter_class_with_validate2_with_wrong_fmt(self): 3375 config = self.custom_formatter_class_validate.copy() 3376 config['formatters']['form1']['style'] = "$" 3377 3378 # Exception should not be raise as we have configured 'validate' to False 3379 self.apply_config(config) 3380 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3381 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3382 3383 def test_custom_formatter_class_with_validate3(self): 3384 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3) 3385 3386 def test_custom_formatter_function_with_validate(self): 3387 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function) 3388 3389 def test_baseconfig(self): 3390 d = { 3391 'atuple': (1, 2, 3), 3392 'alist': ['a', 'b', 'c'], 3393 'adict': {'d': 'e', 'f': 3 }, 3394 'nest1': ('g', ('h', 'i'), 'j'), 3395 'nest2': ['k', ['l', 'm'], 'n'], 3396 'nest3': ['o', 'cfg://alist', 'p'], 3397 } 3398 bc = logging.config.BaseConfigurator(d) 3399 self.assertEqual(bc.convert('cfg://atuple[1]'), 2) 3400 self.assertEqual(bc.convert('cfg://alist[1]'), 'b') 3401 self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h') 3402 self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm') 3403 self.assertEqual(bc.convert('cfg://adict.d'), 'e') 3404 self.assertEqual(bc.convert('cfg://adict[f]'), 3) 3405 v = bc.convert('cfg://nest3') 3406 self.assertEqual(v.pop(1), ['a', 'b', 'c']) 3407 self.assertRaises(KeyError, bc.convert, 'cfg://nosuch') 3408 self.assertRaises(ValueError, bc.convert, 'cfg://!') 3409 self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]') 3410 3411 def test_namedtuple(self): 3412 # see bpo-39142 3413 from collections import namedtuple 3414 3415 class MyHandler(logging.StreamHandler): 3416 def __init__(self, resource, *args, **kwargs): 3417 super().__init__(*args, **kwargs) 3418 self.resource: namedtuple = resource 3419 3420 def emit(self, record): 3421 record.msg += f' {self.resource.type}' 3422 return super().emit(record) 3423 3424 Resource = namedtuple('Resource', ['type', 'labels']) 3425 resource = Resource(type='my_type', labels=['a']) 3426 3427 config = { 3428 'version': 1, 3429 'handlers': { 3430 'myhandler': { 3431 '()': MyHandler, 3432 'resource': resource 3433 } 3434 }, 3435 'root': {'level': 'INFO', 'handlers': ['myhandler']}, 3436 } 3437 with support.captured_stderr() as stderr: 3438 self.apply_config(config) 3439 logging.info('some log') 3440 self.assertEqual(stderr.getvalue(), 'some log my_type\n') 3441 3442class ManagerTest(BaseTest): 3443 def test_manager_loggerclass(self): 3444 logged = [] 3445 3446 class MyLogger(logging.Logger): 3447 def _log(self, level, msg, args, exc_info=None, extra=None): 3448 logged.append(msg) 3449 3450 man = logging.Manager(None) 3451 self.assertRaises(TypeError, man.setLoggerClass, int) 3452 man.setLoggerClass(MyLogger) 3453 logger = man.getLogger('test') 3454 logger.warning('should appear in logged') 3455 logging.warning('should not appear in logged') 3456 3457 self.assertEqual(logged, ['should appear in logged']) 3458 3459 def test_set_log_record_factory(self): 3460 man = logging.Manager(None) 3461 expected = object() 3462 man.setLogRecordFactory(expected) 3463 self.assertEqual(man.logRecordFactory, expected) 3464 3465class ChildLoggerTest(BaseTest): 3466 def test_child_loggers(self): 3467 r = logging.getLogger() 3468 l1 = logging.getLogger('abc') 3469 l2 = logging.getLogger('def.ghi') 3470 c1 = r.getChild('xyz') 3471 c2 = r.getChild('uvw.xyz') 3472 self.assertIs(c1, logging.getLogger('xyz')) 3473 self.assertIs(c2, logging.getLogger('uvw.xyz')) 3474 c1 = l1.getChild('def') 3475 c2 = c1.getChild('ghi') 3476 c3 = l1.getChild('def.ghi') 3477 self.assertIs(c1, logging.getLogger('abc.def')) 3478 self.assertIs(c2, logging.getLogger('abc.def.ghi')) 3479 self.assertIs(c2, c3) 3480 3481 3482class DerivedLogRecord(logging.LogRecord): 3483 pass 3484 3485class LogRecordFactoryTest(BaseTest): 3486 3487 def setUp(self): 3488 class CheckingFilter(logging.Filter): 3489 def __init__(self, cls): 3490 self.cls = cls 3491 3492 def filter(self, record): 3493 t = type(record) 3494 if t is not self.cls: 3495 msg = 'Unexpected LogRecord type %s, expected %s' % (t, 3496 self.cls) 3497 raise TypeError(msg) 3498 return True 3499 3500 BaseTest.setUp(self) 3501 self.filter = CheckingFilter(DerivedLogRecord) 3502 self.root_logger.addFilter(self.filter) 3503 self.orig_factory = logging.getLogRecordFactory() 3504 3505 def tearDown(self): 3506 self.root_logger.removeFilter(self.filter) 3507 BaseTest.tearDown(self) 3508 logging.setLogRecordFactory(self.orig_factory) 3509 3510 def test_logrecord_class(self): 3511 self.assertRaises(TypeError, self.root_logger.warning, 3512 self.next_message()) 3513 logging.setLogRecordFactory(DerivedLogRecord) 3514 self.root_logger.error(self.next_message()) 3515 self.assert_log_lines([ 3516 ('root', 'ERROR', '2'), 3517 ]) 3518 3519 3520class QueueHandlerTest(BaseTest): 3521 # Do not bother with a logger name group. 3522 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 3523 3524 def setUp(self): 3525 BaseTest.setUp(self) 3526 self.queue = queue.Queue(-1) 3527 self.que_hdlr = logging.handlers.QueueHandler(self.queue) 3528 self.name = 'que' 3529 self.que_logger = logging.getLogger('que') 3530 self.que_logger.propagate = False 3531 self.que_logger.setLevel(logging.WARNING) 3532 self.que_logger.addHandler(self.que_hdlr) 3533 3534 def tearDown(self): 3535 self.que_hdlr.close() 3536 BaseTest.tearDown(self) 3537 3538 def test_queue_handler(self): 3539 self.que_logger.debug(self.next_message()) 3540 self.assertRaises(queue.Empty, self.queue.get_nowait) 3541 self.que_logger.info(self.next_message()) 3542 self.assertRaises(queue.Empty, self.queue.get_nowait) 3543 msg = self.next_message() 3544 self.que_logger.warning(msg) 3545 data = self.queue.get_nowait() 3546 self.assertTrue(isinstance(data, logging.LogRecord)) 3547 self.assertEqual(data.name, self.que_logger.name) 3548 self.assertEqual((data.msg, data.args), (msg, None)) 3549 3550 def test_formatting(self): 3551 msg = self.next_message() 3552 levelname = logging.getLevelName(logging.WARNING) 3553 log_format_str = '{name} -> {levelname}: {message}' 3554 formatted_msg = log_format_str.format(name=self.name, 3555 levelname=levelname, message=msg) 3556 formatter = logging.Formatter(self.log_format) 3557 self.que_hdlr.setFormatter(formatter) 3558 self.que_logger.warning(msg) 3559 log_record = self.queue.get_nowait() 3560 self.assertEqual(formatted_msg, log_record.msg) 3561 self.assertEqual(formatted_msg, log_record.message) 3562 3563 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3564 'logging.handlers.QueueListener required for this test') 3565 def test_queue_listener(self): 3566 handler = TestHandler(support.Matcher()) 3567 listener = logging.handlers.QueueListener(self.queue, handler) 3568 listener.start() 3569 try: 3570 self.que_logger.warning(self.next_message()) 3571 self.que_logger.error(self.next_message()) 3572 self.que_logger.critical(self.next_message()) 3573 finally: 3574 listener.stop() 3575 self.assertTrue(handler.matches(levelno=logging.WARNING, message='1')) 3576 self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) 3577 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) 3578 handler.close() 3579 3580 # Now test with respect_handler_level set 3581 3582 handler = TestHandler(support.Matcher()) 3583 handler.setLevel(logging.CRITICAL) 3584 listener = logging.handlers.QueueListener(self.queue, handler, 3585 respect_handler_level=True) 3586 listener.start() 3587 try: 3588 self.que_logger.warning(self.next_message()) 3589 self.que_logger.error(self.next_message()) 3590 self.que_logger.critical(self.next_message()) 3591 finally: 3592 listener.stop() 3593 self.assertFalse(handler.matches(levelno=logging.WARNING, message='4')) 3594 self.assertFalse(handler.matches(levelno=logging.ERROR, message='5')) 3595 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6')) 3596 handler.close() 3597 3598 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3599 'logging.handlers.QueueListener required for this test') 3600 def test_queue_listener_with_StreamHandler(self): 3601 # Test that traceback only appends once (bpo-34334). 3602 listener = logging.handlers.QueueListener(self.queue, self.root_hdlr) 3603 listener.start() 3604 try: 3605 1 / 0 3606 except ZeroDivisionError as e: 3607 exc = e 3608 self.que_logger.exception(self.next_message(), exc_info=exc) 3609 listener.stop() 3610 self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1) 3611 3612 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3613 'logging.handlers.QueueListener required for this test') 3614 def test_queue_listener_with_multiple_handlers(self): 3615 # Test that queue handler format doesn't affect other handler formats (bpo-35726). 3616 self.que_hdlr.setFormatter(self.root_formatter) 3617 self.que_logger.addHandler(self.root_hdlr) 3618 3619 listener = logging.handlers.QueueListener(self.queue, self.que_hdlr) 3620 listener.start() 3621 self.que_logger.error("error") 3622 listener.stop() 3623 self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error") 3624 3625if hasattr(logging.handlers, 'QueueListener'): 3626 import multiprocessing 3627 from unittest.mock import patch 3628 3629 class QueueListenerTest(BaseTest): 3630 """ 3631 Tests based on patch submitted for issue #27930. Ensure that 3632 QueueListener handles all log messages. 3633 """ 3634 3635 repeat = 20 3636 3637 @staticmethod 3638 def setup_and_log(log_queue, ident): 3639 """ 3640 Creates a logger with a QueueHandler that logs to a queue read by a 3641 QueueListener. Starts the listener, logs five messages, and stops 3642 the listener. 3643 """ 3644 logger = logging.getLogger('test_logger_with_id_%s' % ident) 3645 logger.setLevel(logging.DEBUG) 3646 handler = logging.handlers.QueueHandler(log_queue) 3647 logger.addHandler(handler) 3648 listener = logging.handlers.QueueListener(log_queue) 3649 listener.start() 3650 3651 logger.info('one') 3652 logger.info('two') 3653 logger.info('three') 3654 logger.info('four') 3655 logger.info('five') 3656 3657 listener.stop() 3658 logger.removeHandler(handler) 3659 handler.close() 3660 3661 @patch.object(logging.handlers.QueueListener, 'handle') 3662 def test_handle_called_with_queue_queue(self, mock_handle): 3663 for i in range(self.repeat): 3664 log_queue = queue.Queue() 3665 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3666 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3667 'correct number of handled log messages') 3668 3669 @patch.object(logging.handlers.QueueListener, 'handle') 3670 def test_handle_called_with_mp_queue(self, mock_handle): 3671 # bpo-28668: The multiprocessing (mp) module is not functional 3672 # when the mp.synchronize module cannot be imported. 3673 support.skip_if_broken_multiprocessing_synchronize() 3674 for i in range(self.repeat): 3675 log_queue = multiprocessing.Queue() 3676 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3677 log_queue.close() 3678 log_queue.join_thread() 3679 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3680 'correct number of handled log messages') 3681 3682 @staticmethod 3683 def get_all_from_queue(log_queue): 3684 try: 3685 while True: 3686 yield log_queue.get_nowait() 3687 except queue.Empty: 3688 return [] 3689 3690 def test_no_messages_in_queue_after_stop(self): 3691 """ 3692 Five messages are logged then the QueueListener is stopped. This 3693 test then gets everything off the queue. Failure of this test 3694 indicates that messages were not registered on the queue until 3695 _after_ the QueueListener stopped. 3696 """ 3697 # bpo-28668: The multiprocessing (mp) module is not functional 3698 # when the mp.synchronize module cannot be imported. 3699 support.skip_if_broken_multiprocessing_synchronize() 3700 for i in range(self.repeat): 3701 queue = multiprocessing.Queue() 3702 self.setup_and_log(queue, '%s_%s' %(self.id(), i)) 3703 # time.sleep(1) 3704 items = list(self.get_all_from_queue(queue)) 3705 queue.close() 3706 queue.join_thread() 3707 3708 expected = [[], [logging.handlers.QueueListener._sentinel]] 3709 self.assertIn(items, expected, 3710 'Found unexpected messages in queue: %s' % ( 3711 [m.msg if isinstance(m, logging.LogRecord) 3712 else m for m in items])) 3713 3714 def test_calls_task_done_after_stop(self): 3715 # Issue 36813: Make sure queue.join does not deadlock. 3716 log_queue = queue.Queue() 3717 listener = logging.handlers.QueueListener(log_queue) 3718 listener.start() 3719 listener.stop() 3720 with self.assertRaises(ValueError): 3721 # Make sure all tasks are done and .join won't block. 3722 log_queue.task_done() 3723 3724 3725ZERO = datetime.timedelta(0) 3726 3727class UTC(datetime.tzinfo): 3728 def utcoffset(self, dt): 3729 return ZERO 3730 3731 dst = utcoffset 3732 3733 def tzname(self, dt): 3734 return 'UTC' 3735 3736utc = UTC() 3737 3738class AssertErrorMessage: 3739 3740 def assert_error_message(self, exception, message, *args, **kwargs): 3741 try: 3742 self.assertRaises((), *args, **kwargs) 3743 except exception as e: 3744 self.assertEqual(message, str(e)) 3745 3746class FormatterTest(unittest.TestCase, AssertErrorMessage): 3747 def setUp(self): 3748 self.common = { 3749 'name': 'formatter.test', 3750 'level': logging.DEBUG, 3751 'pathname': os.path.join('path', 'to', 'dummy.ext'), 3752 'lineno': 42, 3753 'exc_info': None, 3754 'func': None, 3755 'msg': 'Message with %d %s', 3756 'args': (2, 'placeholders'), 3757 } 3758 self.variants = { 3759 'custom': { 3760 'custom': 1234 3761 } 3762 } 3763 3764 def get_record(self, name=None): 3765 result = dict(self.common) 3766 if name is not None: 3767 result.update(self.variants[name]) 3768 return logging.makeLogRecord(result) 3769 3770 def test_percent(self): 3771 # Test %-formatting 3772 r = self.get_record() 3773 f = logging.Formatter('${%(message)s}') 3774 self.assertEqual(f.format(r), '${Message with 2 placeholders}') 3775 f = logging.Formatter('%(random)s') 3776 self.assertRaises(ValueError, f.format, r) 3777 self.assertFalse(f.usesTime()) 3778 f = logging.Formatter('%(asctime)s') 3779 self.assertTrue(f.usesTime()) 3780 f = logging.Formatter('%(asctime)-15s') 3781 self.assertTrue(f.usesTime()) 3782 f = logging.Formatter('%(asctime)#15s') 3783 self.assertTrue(f.usesTime()) 3784 3785 def test_braces(self): 3786 # Test {}-formatting 3787 r = self.get_record() 3788 f = logging.Formatter('$%{message}%$', style='{') 3789 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3790 f = logging.Formatter('{random}', style='{') 3791 self.assertRaises(ValueError, f.format, r) 3792 f = logging.Formatter("{message}", style='{') 3793 self.assertFalse(f.usesTime()) 3794 f = logging.Formatter('{asctime}', style='{') 3795 self.assertTrue(f.usesTime()) 3796 f = logging.Formatter('{asctime!s:15}', style='{') 3797 self.assertTrue(f.usesTime()) 3798 f = logging.Formatter('{asctime:15}', style='{') 3799 self.assertTrue(f.usesTime()) 3800 3801 def test_dollars(self): 3802 # Test $-formatting 3803 r = self.get_record() 3804 f = logging.Formatter('${message}', style='$') 3805 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3806 f = logging.Formatter('$message', style='$') 3807 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3808 f = logging.Formatter('$$%${message}%$$', style='$') 3809 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3810 f = logging.Formatter('${random}', style='$') 3811 self.assertRaises(ValueError, f.format, r) 3812 self.assertFalse(f.usesTime()) 3813 f = logging.Formatter('${asctime}', style='$') 3814 self.assertTrue(f.usesTime()) 3815 f = logging.Formatter('$asctime', style='$') 3816 self.assertTrue(f.usesTime()) 3817 f = logging.Formatter('${message}', style='$') 3818 self.assertFalse(f.usesTime()) 3819 f = logging.Formatter('${asctime}--', style='$') 3820 self.assertTrue(f.usesTime()) 3821 3822 def test_format_validate(self): 3823 # Check correct formatting 3824 # Percentage style 3825 f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3826 self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3827 f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3828 self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3829 f = logging.Formatter("%(process)#+027.23X") 3830 self.assertEqual(f._fmt, "%(process)#+027.23X") 3831 f = logging.Formatter("%(foo)#.*g") 3832 self.assertEqual(f._fmt, "%(foo)#.*g") 3833 3834 # StrFormat Style 3835 f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{") 3836 self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}") 3837 f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{") 3838 self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}") 3839 f = logging.Formatter("{customfield!s:#<30}", style="{") 3840 self.assertEqual(f._fmt, "{customfield!s:#<30}") 3841 f = logging.Formatter("{message!r}", style="{") 3842 self.assertEqual(f._fmt, "{message!r}") 3843 f = logging.Formatter("{message!s}", style="{") 3844 self.assertEqual(f._fmt, "{message!s}") 3845 f = logging.Formatter("{message!a}", style="{") 3846 self.assertEqual(f._fmt, "{message!a}") 3847 f = logging.Formatter("{process!r:4.2}", style="{") 3848 self.assertEqual(f._fmt, "{process!r:4.2}") 3849 f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{") 3850 self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}") 3851 f = logging.Formatter("{process!s:{w},.{p}}", style="{") 3852 self.assertEqual(f._fmt, "{process!s:{w},.{p}}") 3853 f = logging.Formatter("{foo:12.{p}}", style="{") 3854 self.assertEqual(f._fmt, "{foo:12.{p}}") 3855 f = logging.Formatter("{foo:{w}.6}", style="{") 3856 self.assertEqual(f._fmt, "{foo:{w}.6}") 3857 f = logging.Formatter("{foo[0].bar[1].baz}", style="{") 3858 self.assertEqual(f._fmt, "{foo[0].bar[1].baz}") 3859 f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{") 3860 self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}") 3861 f = logging.Formatter("{12[k1].bar[k2].baz}", style="{") 3862 self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}") 3863 3864 # Dollar style 3865 f = logging.Formatter("${asctime} - $message", style="$") 3866 self.assertEqual(f._fmt, "${asctime} - $message") 3867 f = logging.Formatter("$bar $$", style="$") 3868 self.assertEqual(f._fmt, "$bar $$") 3869 f = logging.Formatter("$bar $$$$", style="$") 3870 self.assertEqual(f._fmt, "$bar $$$$") # this would print two $($$) 3871 3872 # Testing when ValueError being raised from incorrect format 3873 # Percentage Style 3874 self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z") 3875 self.assertRaises(ValueError, logging.Formatter, "%(asctime)b") 3876 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*") 3877 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s") 3878 self.assertRaises(ValueError, logging.Formatter, "%(asctime)_") 3879 self.assertRaises(ValueError, logging.Formatter, '{asctime}') 3880 self.assertRaises(ValueError, logging.Formatter, '${message}') 3881 self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f') # with both * and decimal number as precision 3882 self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f') 3883 3884 # StrFormat Style 3885 # Testing failure for '-' in field name 3886 self.assert_error_message( 3887 ValueError, 3888 "invalid format: invalid field name/expression: 'name-thing'", 3889 logging.Formatter, "{name-thing}", style="{" 3890 ) 3891 # Testing failure for style mismatch 3892 self.assert_error_message( 3893 ValueError, 3894 "invalid format: no fields", 3895 logging.Formatter, '%(asctime)s', style='{' 3896 ) 3897 # Testing failure for invalid conversion 3898 self.assert_error_message( 3899 ValueError, 3900 "invalid conversion: 'Z'" 3901 ) 3902 self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{') 3903 self.assert_error_message( 3904 ValueError, 3905 "invalid format: expected ':' after conversion specifier", 3906 logging.Formatter, '{asctime!aa:15}', style='{' 3907 ) 3908 # Testing failure for invalid spec 3909 self.assert_error_message( 3910 ValueError, 3911 "invalid format: bad specifier: '.2ff'", 3912 logging.Formatter, '{process:.2ff}', style='{' 3913 ) 3914 self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{') 3915 self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{') 3916 self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{') 3917 self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{') 3918 # Testing failure for mismatch braces 3919 self.assert_error_message( 3920 ValueError, 3921 "invalid format: expected '}' before end of string", 3922 logging.Formatter, '{process', style='{' 3923 ) 3924 self.assert_error_message( 3925 ValueError, 3926 "invalid format: Single '}' encountered in format string", 3927 logging.Formatter, 'process}', style='{' 3928 ) 3929 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{') 3930 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{') 3931 self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{') 3932 self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{') 3933 self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{') 3934 self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{') 3935 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{') 3936 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{') 3937 self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{') 3938 3939 # Dollar style 3940 # Testing failure for mismatch bare $ 3941 self.assert_error_message( 3942 ValueError, 3943 "invalid format: bare \'$\' not allowed", 3944 logging.Formatter, '$bar $$$', style='$' 3945 ) 3946 self.assert_error_message( 3947 ValueError, 3948 "invalid format: bare \'$\' not allowed", 3949 logging.Formatter, 'bar $', style='$' 3950 ) 3951 self.assert_error_message( 3952 ValueError, 3953 "invalid format: bare \'$\' not allowed", 3954 logging.Formatter, 'foo $.', style='$' 3955 ) 3956 # Testing failure for mismatch style 3957 self.assert_error_message( 3958 ValueError, 3959 "invalid format: no fields", 3960 logging.Formatter, '{asctime}', style='$' 3961 ) 3962 self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$') 3963 3964 # Testing failure for incorrect fields 3965 self.assert_error_message( 3966 ValueError, 3967 "invalid format: no fields", 3968 logging.Formatter, 'foo', style='$' 3969 ) 3970 self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$') 3971 3972 def test_defaults_parameter(self): 3973 fmts = ['%(custom)s %(message)s', '{custom} {message}', '$custom $message'] 3974 styles = ['%', '{', '$'] 3975 for fmt, style in zip(fmts, styles): 3976 f = logging.Formatter(fmt, style=style, defaults={'custom': 'Default'}) 3977 r = self.get_record() 3978 self.assertEqual(f.format(r), 'Default Message with 2 placeholders') 3979 r = self.get_record("custom") 3980 self.assertEqual(f.format(r), '1234 Message with 2 placeholders') 3981 3982 # Without default 3983 f = logging.Formatter(fmt, style=style) 3984 r = self.get_record() 3985 self.assertRaises(ValueError, f.format, r) 3986 3987 # Non-existing default is ignored 3988 f = logging.Formatter(fmt, style=style, defaults={'Non-existing': 'Default'}) 3989 r = self.get_record("custom") 3990 self.assertEqual(f.format(r), '1234 Message with 2 placeholders') 3991 3992 def test_invalid_style(self): 3993 self.assertRaises(ValueError, logging.Formatter, None, None, 'x') 3994 3995 def test_time(self): 3996 r = self.get_record() 3997 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc) 3998 # We use None to indicate we want the local timezone 3999 # We're essentially converting a UTC time to local time 4000 r.created = time.mktime(dt.astimezone(None).timetuple()) 4001 r.msecs = 123 4002 f = logging.Formatter('%(asctime)s %(message)s') 4003 f.converter = time.gmtime 4004 self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123') 4005 self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21') 4006 f.format(r) 4007 self.assertEqual(r.asctime, '1993-04-21 08:03:00,123') 4008 4009 def test_default_msec_format_none(self): 4010 class NoMsecFormatter(logging.Formatter): 4011 default_msec_format = None 4012 default_time_format = '%d/%m/%Y %H:%M:%S' 4013 4014 r = self.get_record() 4015 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 123, utc) 4016 r.created = time.mktime(dt.astimezone(None).timetuple()) 4017 f = NoMsecFormatter() 4018 f.converter = time.gmtime 4019 self.assertEqual(f.formatTime(r), '21/04/1993 08:03:00') 4020 4021 4022class TestBufferingFormatter(logging.BufferingFormatter): 4023 def formatHeader(self, records): 4024 return '[(%d)' % len(records) 4025 4026 def formatFooter(self, records): 4027 return '(%d)]' % len(records) 4028 4029class BufferingFormatterTest(unittest.TestCase): 4030 def setUp(self): 4031 self.records = [ 4032 logging.makeLogRecord({'msg': 'one'}), 4033 logging.makeLogRecord({'msg': 'two'}), 4034 ] 4035 4036 def test_default(self): 4037 f = logging.BufferingFormatter() 4038 self.assertEqual('', f.format([])) 4039 self.assertEqual('onetwo', f.format(self.records)) 4040 4041 def test_custom(self): 4042 f = TestBufferingFormatter() 4043 self.assertEqual('[(2)onetwo(2)]', f.format(self.records)) 4044 lf = logging.Formatter('<%(message)s>') 4045 f = TestBufferingFormatter(lf) 4046 self.assertEqual('[(2)<one><two>(2)]', f.format(self.records)) 4047 4048class ExceptionTest(BaseTest): 4049 def test_formatting(self): 4050 r = self.root_logger 4051 h = RecordingHandler() 4052 r.addHandler(h) 4053 try: 4054 raise RuntimeError('deliberate mistake') 4055 except: 4056 logging.exception('failed', stack_info=True) 4057 r.removeHandler(h) 4058 h.close() 4059 r = h.records[0] 4060 self.assertTrue(r.exc_text.startswith('Traceback (most recent ' 4061 'call last):\n')) 4062 self.assertTrue(r.exc_text.endswith('\nRuntimeError: ' 4063 'deliberate mistake')) 4064 self.assertTrue(r.stack_info.startswith('Stack (most recent ' 4065 'call last):\n')) 4066 self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', ' 4067 'stack_info=True)')) 4068 4069 4070class LastResortTest(BaseTest): 4071 def test_last_resort(self): 4072 # Test the last resort handler 4073 root = self.root_logger 4074 root.removeHandler(self.root_hdlr) 4075 old_lastresort = logging.lastResort 4076 old_raise_exceptions = logging.raiseExceptions 4077 4078 try: 4079 with support.captured_stderr() as stderr: 4080 root.debug('This should not appear') 4081 self.assertEqual(stderr.getvalue(), '') 4082 root.warning('Final chance!') 4083 self.assertEqual(stderr.getvalue(), 'Final chance!\n') 4084 4085 # No handlers and no last resort, so 'No handlers' message 4086 logging.lastResort = None 4087 with support.captured_stderr() as stderr: 4088 root.warning('Final chance!') 4089 msg = 'No handlers could be found for logger "root"\n' 4090 self.assertEqual(stderr.getvalue(), msg) 4091 4092 # 'No handlers' message only printed once 4093 with support.captured_stderr() as stderr: 4094 root.warning('Final chance!') 4095 self.assertEqual(stderr.getvalue(), '') 4096 4097 # If raiseExceptions is False, no message is printed 4098 root.manager.emittedNoHandlerWarning = False 4099 logging.raiseExceptions = False 4100 with support.captured_stderr() as stderr: 4101 root.warning('Final chance!') 4102 self.assertEqual(stderr.getvalue(), '') 4103 finally: 4104 root.addHandler(self.root_hdlr) 4105 logging.lastResort = old_lastresort 4106 logging.raiseExceptions = old_raise_exceptions 4107 4108 4109class FakeHandler: 4110 4111 def __init__(self, identifier, called): 4112 for method in ('acquire', 'flush', 'close', 'release'): 4113 setattr(self, method, self.record_call(identifier, method, called)) 4114 4115 def record_call(self, identifier, method_name, called): 4116 def inner(): 4117 called.append('{} - {}'.format(identifier, method_name)) 4118 return inner 4119 4120 4121class RecordingHandler(logging.NullHandler): 4122 4123 def __init__(self, *args, **kwargs): 4124 super(RecordingHandler, self).__init__(*args, **kwargs) 4125 self.records = [] 4126 4127 def handle(self, record): 4128 """Keep track of all the emitted records.""" 4129 self.records.append(record) 4130 4131 4132class ShutdownTest(BaseTest): 4133 4134 """Test suite for the shutdown method.""" 4135 4136 def setUp(self): 4137 super(ShutdownTest, self).setUp() 4138 self.called = [] 4139 4140 raise_exceptions = logging.raiseExceptions 4141 self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions) 4142 4143 def raise_error(self, error): 4144 def inner(): 4145 raise error() 4146 return inner 4147 4148 def test_no_failure(self): 4149 # create some fake handlers 4150 handler0 = FakeHandler(0, self.called) 4151 handler1 = FakeHandler(1, self.called) 4152 handler2 = FakeHandler(2, self.called) 4153 4154 # create live weakref to those handlers 4155 handlers = map(logging.weakref.ref, [handler0, handler1, handler2]) 4156 4157 logging.shutdown(handlerList=list(handlers)) 4158 4159 expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release', 4160 '1 - acquire', '1 - flush', '1 - close', '1 - release', 4161 '0 - acquire', '0 - flush', '0 - close', '0 - release'] 4162 self.assertEqual(expected, self.called) 4163 4164 def _test_with_failure_in_method(self, method, error): 4165 handler = FakeHandler(0, self.called) 4166 setattr(handler, method, self.raise_error(error)) 4167 handlers = [logging.weakref.ref(handler)] 4168 4169 logging.shutdown(handlerList=list(handlers)) 4170 4171 self.assertEqual('0 - release', self.called[-1]) 4172 4173 def test_with_ioerror_in_acquire(self): 4174 self._test_with_failure_in_method('acquire', OSError) 4175 4176 def test_with_ioerror_in_flush(self): 4177 self._test_with_failure_in_method('flush', OSError) 4178 4179 def test_with_ioerror_in_close(self): 4180 self._test_with_failure_in_method('close', OSError) 4181 4182 def test_with_valueerror_in_acquire(self): 4183 self._test_with_failure_in_method('acquire', ValueError) 4184 4185 def test_with_valueerror_in_flush(self): 4186 self._test_with_failure_in_method('flush', ValueError) 4187 4188 def test_with_valueerror_in_close(self): 4189 self._test_with_failure_in_method('close', ValueError) 4190 4191 def test_with_other_error_in_acquire_without_raise(self): 4192 logging.raiseExceptions = False 4193 self._test_with_failure_in_method('acquire', IndexError) 4194 4195 def test_with_other_error_in_flush_without_raise(self): 4196 logging.raiseExceptions = False 4197 self._test_with_failure_in_method('flush', IndexError) 4198 4199 def test_with_other_error_in_close_without_raise(self): 4200 logging.raiseExceptions = False 4201 self._test_with_failure_in_method('close', IndexError) 4202 4203 def test_with_other_error_in_acquire_with_raise(self): 4204 logging.raiseExceptions = True 4205 self.assertRaises(IndexError, self._test_with_failure_in_method, 4206 'acquire', IndexError) 4207 4208 def test_with_other_error_in_flush_with_raise(self): 4209 logging.raiseExceptions = True 4210 self.assertRaises(IndexError, self._test_with_failure_in_method, 4211 'flush', IndexError) 4212 4213 def test_with_other_error_in_close_with_raise(self): 4214 logging.raiseExceptions = True 4215 self.assertRaises(IndexError, self._test_with_failure_in_method, 4216 'close', IndexError) 4217 4218 4219class ModuleLevelMiscTest(BaseTest): 4220 4221 """Test suite for some module level methods.""" 4222 4223 def test_disable(self): 4224 old_disable = logging.root.manager.disable 4225 # confirm our assumptions are correct 4226 self.assertEqual(old_disable, 0) 4227 self.addCleanup(logging.disable, old_disable) 4228 4229 logging.disable(83) 4230 self.assertEqual(logging.root.manager.disable, 83) 4231 4232 self.assertRaises(ValueError, logging.disable, "doesnotexists") 4233 4234 class _NotAnIntOrString: 4235 pass 4236 4237 self.assertRaises(TypeError, logging.disable, _NotAnIntOrString()) 4238 4239 logging.disable("WARN") 4240 4241 # test the default value introduced in 3.7 4242 # (Issue #28524) 4243 logging.disable() 4244 self.assertEqual(logging.root.manager.disable, logging.CRITICAL) 4245 4246 def _test_log(self, method, level=None): 4247 called = [] 4248 support.patch(self, logging, 'basicConfig', 4249 lambda *a, **kw: called.append((a, kw))) 4250 4251 recording = RecordingHandler() 4252 logging.root.addHandler(recording) 4253 4254 log_method = getattr(logging, method) 4255 if level is not None: 4256 log_method(level, "test me: %r", recording) 4257 else: 4258 log_method("test me: %r", recording) 4259 4260 self.assertEqual(len(recording.records), 1) 4261 record = recording.records[0] 4262 self.assertEqual(record.getMessage(), "test me: %r" % recording) 4263 4264 expected_level = level if level is not None else getattr(logging, method.upper()) 4265 self.assertEqual(record.levelno, expected_level) 4266 4267 # basicConfig was not called! 4268 self.assertEqual(called, []) 4269 4270 def test_log(self): 4271 self._test_log('log', logging.ERROR) 4272 4273 def test_debug(self): 4274 self._test_log('debug') 4275 4276 def test_info(self): 4277 self._test_log('info') 4278 4279 def test_warning(self): 4280 self._test_log('warning') 4281 4282 def test_error(self): 4283 self._test_log('error') 4284 4285 def test_critical(self): 4286 self._test_log('critical') 4287 4288 def test_set_logger_class(self): 4289 self.assertRaises(TypeError, logging.setLoggerClass, object) 4290 4291 class MyLogger(logging.Logger): 4292 pass 4293 4294 logging.setLoggerClass(MyLogger) 4295 self.assertEqual(logging.getLoggerClass(), MyLogger) 4296 4297 logging.setLoggerClass(logging.Logger) 4298 self.assertEqual(logging.getLoggerClass(), logging.Logger) 4299 4300 def test_subclass_logger_cache(self): 4301 # bpo-37258 4302 message = [] 4303 4304 class MyLogger(logging.getLoggerClass()): 4305 def __init__(self, name='MyLogger', level=logging.NOTSET): 4306 super().__init__(name, level) 4307 message.append('initialized') 4308 4309 logging.setLoggerClass(MyLogger) 4310 logger = logging.getLogger('just_some_logger') 4311 self.assertEqual(message, ['initialized']) 4312 stream = io.StringIO() 4313 h = logging.StreamHandler(stream) 4314 logger.addHandler(h) 4315 try: 4316 logger.setLevel(logging.DEBUG) 4317 logger.debug("hello") 4318 self.assertEqual(stream.getvalue().strip(), "hello") 4319 4320 stream.truncate(0) 4321 stream.seek(0) 4322 4323 logger.setLevel(logging.INFO) 4324 logger.debug("hello") 4325 self.assertEqual(stream.getvalue(), "") 4326 finally: 4327 logger.removeHandler(h) 4328 h.close() 4329 logging.setLoggerClass(logging.Logger) 4330 4331 def test_logging_at_shutdown(self): 4332 # bpo-20037: Doing text I/O late at interpreter shutdown must not crash 4333 code = textwrap.dedent(""" 4334 import logging 4335 4336 class A: 4337 def __del__(self): 4338 try: 4339 raise ValueError("some error") 4340 except Exception: 4341 logging.exception("exception in __del__") 4342 4343 a = A() 4344 """) 4345 rc, out, err = assert_python_ok("-c", code) 4346 err = err.decode() 4347 self.assertIn("exception in __del__", err) 4348 self.assertIn("ValueError: some error", err) 4349 4350 def test_logging_at_shutdown_open(self): 4351 # bpo-26789: FileHandler keeps a reference to the builtin open() 4352 # function to be able to open or reopen the file during Python 4353 # finalization. 4354 filename = os_helper.TESTFN 4355 self.addCleanup(os_helper.unlink, filename) 4356 4357 code = textwrap.dedent(f""" 4358 import builtins 4359 import logging 4360 4361 class A: 4362 def __del__(self): 4363 logging.error("log in __del__") 4364 4365 # basicConfig() opens the file, but logging.shutdown() closes 4366 # it at Python exit. When A.__del__() is called, 4367 # FileHandler._open() must be called again to re-open the file. 4368 logging.basicConfig(filename={filename!r}, encoding="utf-8") 4369 4370 a = A() 4371 4372 # Simulate the Python finalization which removes the builtin 4373 # open() function. 4374 del builtins.open 4375 """) 4376 assert_python_ok("-c", code) 4377 4378 with open(filename, encoding="utf-8") as fp: 4379 self.assertEqual(fp.read().rstrip(), "ERROR:root:log in __del__") 4380 4381 def test_recursion_error(self): 4382 # Issue 36272 4383 code = textwrap.dedent(""" 4384 import logging 4385 4386 def rec(): 4387 logging.error("foo") 4388 rec() 4389 4390 rec() 4391 """) 4392 rc, out, err = assert_python_failure("-c", code) 4393 err = err.decode() 4394 self.assertNotIn("Cannot recover from stack overflow.", err) 4395 self.assertEqual(rc, 1) 4396 4397 4398class LogRecordTest(BaseTest): 4399 def test_str_rep(self): 4400 r = logging.makeLogRecord({}) 4401 s = str(r) 4402 self.assertTrue(s.startswith('<LogRecord: ')) 4403 self.assertTrue(s.endswith('>')) 4404 4405 def test_dict_arg(self): 4406 h = RecordingHandler() 4407 r = logging.getLogger() 4408 r.addHandler(h) 4409 d = {'less' : 'more' } 4410 logging.warning('less is %(less)s', d) 4411 self.assertIs(h.records[0].args, d) 4412 self.assertEqual(h.records[0].message, 'less is more') 4413 r.removeHandler(h) 4414 h.close() 4415 4416 @staticmethod # pickled as target of child process in the following test 4417 def _extract_logrecord_process_name(key, logMultiprocessing, conn=None): 4418 prev_logMultiprocessing = logging.logMultiprocessing 4419 logging.logMultiprocessing = logMultiprocessing 4420 try: 4421 import multiprocessing as mp 4422 name = mp.current_process().name 4423 4424 r1 = logging.makeLogRecord({'msg': f'msg1_{key}'}) 4425 4426 # https://bugs.python.org/issue45128 4427 with support.swap_item(sys.modules, 'multiprocessing', None): 4428 r2 = logging.makeLogRecord({'msg': f'msg2_{key}'}) 4429 4430 results = {'processName' : name, 4431 'r1.processName': r1.processName, 4432 'r2.processName': r2.processName, 4433 } 4434 finally: 4435 logging.logMultiprocessing = prev_logMultiprocessing 4436 if conn: 4437 conn.send(results) 4438 else: 4439 return results 4440 4441 def test_multiprocessing(self): 4442 multiprocessing_imported = 'multiprocessing' in sys.modules 4443 try: 4444 # logMultiprocessing is True by default 4445 self.assertEqual(logging.logMultiprocessing, True) 4446 4447 LOG_MULTI_PROCESSING = True 4448 # When logMultiprocessing == True: 4449 # In the main process processName = 'MainProcess' 4450 r = logging.makeLogRecord({}) 4451 self.assertEqual(r.processName, 'MainProcess') 4452 4453 results = self._extract_logrecord_process_name(1, LOG_MULTI_PROCESSING) 4454 self.assertEqual('MainProcess', results['processName']) 4455 self.assertEqual('MainProcess', results['r1.processName']) 4456 self.assertEqual('MainProcess', results['r2.processName']) 4457 4458 # In other processes, processName is correct when multiprocessing in imported, 4459 # but it is (incorrectly) defaulted to 'MainProcess' otherwise (bpo-38762). 4460 import multiprocessing 4461 parent_conn, child_conn = multiprocessing.Pipe() 4462 p = multiprocessing.Process( 4463 target=self._extract_logrecord_process_name, 4464 args=(2, LOG_MULTI_PROCESSING, child_conn,) 4465 ) 4466 p.start() 4467 results = parent_conn.recv() 4468 self.assertNotEqual('MainProcess', results['processName']) 4469 self.assertEqual(results['processName'], results['r1.processName']) 4470 self.assertEqual('MainProcess', results['r2.processName']) 4471 p.join() 4472 4473 finally: 4474 if multiprocessing_imported: 4475 import multiprocessing 4476 4477 def test_optional(self): 4478 r = logging.makeLogRecord({}) 4479 NOT_NONE = self.assertIsNotNone 4480 NOT_NONE(r.thread) 4481 NOT_NONE(r.threadName) 4482 NOT_NONE(r.process) 4483 NOT_NONE(r.processName) 4484 log_threads = logging.logThreads 4485 log_processes = logging.logProcesses 4486 log_multiprocessing = logging.logMultiprocessing 4487 try: 4488 logging.logThreads = False 4489 logging.logProcesses = False 4490 logging.logMultiprocessing = False 4491 r = logging.makeLogRecord({}) 4492 NONE = self.assertIsNone 4493 NONE(r.thread) 4494 NONE(r.threadName) 4495 NONE(r.process) 4496 NONE(r.processName) 4497 finally: 4498 logging.logThreads = log_threads 4499 logging.logProcesses = log_processes 4500 logging.logMultiprocessing = log_multiprocessing 4501 4502class BasicConfigTest(unittest.TestCase): 4503 4504 """Test suite for logging.basicConfig.""" 4505 4506 def setUp(self): 4507 super(BasicConfigTest, self).setUp() 4508 self.handlers = logging.root.handlers 4509 self.saved_handlers = logging._handlers.copy() 4510 self.saved_handler_list = logging._handlerList[:] 4511 self.original_logging_level = logging.root.level 4512 self.addCleanup(self.cleanup) 4513 logging.root.handlers = [] 4514 4515 def tearDown(self): 4516 for h in logging.root.handlers[:]: 4517 logging.root.removeHandler(h) 4518 h.close() 4519 super(BasicConfigTest, self).tearDown() 4520 4521 def cleanup(self): 4522 setattr(logging.root, 'handlers', self.handlers) 4523 logging._handlers.clear() 4524 logging._handlers.update(self.saved_handlers) 4525 logging._handlerList[:] = self.saved_handler_list 4526 logging.root.setLevel(self.original_logging_level) 4527 4528 def test_no_kwargs(self): 4529 logging.basicConfig() 4530 4531 # handler defaults to a StreamHandler to sys.stderr 4532 self.assertEqual(len(logging.root.handlers), 1) 4533 handler = logging.root.handlers[0] 4534 self.assertIsInstance(handler, logging.StreamHandler) 4535 self.assertEqual(handler.stream, sys.stderr) 4536 4537 formatter = handler.formatter 4538 # format defaults to logging.BASIC_FORMAT 4539 self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT) 4540 # datefmt defaults to None 4541 self.assertIsNone(formatter.datefmt) 4542 # style defaults to % 4543 self.assertIsInstance(formatter._style, logging.PercentStyle) 4544 4545 # level is not explicitly set 4546 self.assertEqual(logging.root.level, self.original_logging_level) 4547 4548 def test_strformatstyle(self): 4549 with support.captured_stdout() as output: 4550 logging.basicConfig(stream=sys.stdout, style="{") 4551 logging.error("Log an error") 4552 sys.stdout.seek(0) 4553 self.assertEqual(output.getvalue().strip(), 4554 "ERROR:root:Log an error") 4555 4556 def test_stringtemplatestyle(self): 4557 with support.captured_stdout() as output: 4558 logging.basicConfig(stream=sys.stdout, style="$") 4559 logging.error("Log an error") 4560 sys.stdout.seek(0) 4561 self.assertEqual(output.getvalue().strip(), 4562 "ERROR:root:Log an error") 4563 4564 def test_filename(self): 4565 4566 def cleanup(h1, h2, fn): 4567 h1.close() 4568 h2.close() 4569 os.remove(fn) 4570 4571 logging.basicConfig(filename='test.log', encoding='utf-8') 4572 4573 self.assertEqual(len(logging.root.handlers), 1) 4574 handler = logging.root.handlers[0] 4575 self.assertIsInstance(handler, logging.FileHandler) 4576 4577 expected = logging.FileHandler('test.log', 'a', encoding='utf-8') 4578 self.assertEqual(handler.stream.mode, expected.stream.mode) 4579 self.assertEqual(handler.stream.name, expected.stream.name) 4580 self.addCleanup(cleanup, handler, expected, 'test.log') 4581 4582 def test_filemode(self): 4583 4584 def cleanup(h1, h2, fn): 4585 h1.close() 4586 h2.close() 4587 os.remove(fn) 4588 4589 logging.basicConfig(filename='test.log', filemode='wb') 4590 4591 handler = logging.root.handlers[0] 4592 expected = logging.FileHandler('test.log', 'wb') 4593 self.assertEqual(handler.stream.mode, expected.stream.mode) 4594 self.addCleanup(cleanup, handler, expected, 'test.log') 4595 4596 def test_stream(self): 4597 stream = io.StringIO() 4598 self.addCleanup(stream.close) 4599 logging.basicConfig(stream=stream) 4600 4601 self.assertEqual(len(logging.root.handlers), 1) 4602 handler = logging.root.handlers[0] 4603 self.assertIsInstance(handler, logging.StreamHandler) 4604 self.assertEqual(handler.stream, stream) 4605 4606 def test_format(self): 4607 logging.basicConfig(format='%(asctime)s - %(message)s') 4608 4609 formatter = logging.root.handlers[0].formatter 4610 self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s') 4611 4612 def test_datefmt(self): 4613 logging.basicConfig(datefmt='bar') 4614 4615 formatter = logging.root.handlers[0].formatter 4616 self.assertEqual(formatter.datefmt, 'bar') 4617 4618 def test_style(self): 4619 logging.basicConfig(style='$') 4620 4621 formatter = logging.root.handlers[0].formatter 4622 self.assertIsInstance(formatter._style, logging.StringTemplateStyle) 4623 4624 def test_level(self): 4625 old_level = logging.root.level 4626 self.addCleanup(logging.root.setLevel, old_level) 4627 4628 logging.basicConfig(level=57) 4629 self.assertEqual(logging.root.level, 57) 4630 # Test that second call has no effect 4631 logging.basicConfig(level=58) 4632 self.assertEqual(logging.root.level, 57) 4633 4634 def test_incompatible(self): 4635 assertRaises = self.assertRaises 4636 handlers = [logging.StreamHandler()] 4637 stream = sys.stderr 4638 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4639 stream=stream) 4640 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4641 handlers=handlers) 4642 assertRaises(ValueError, logging.basicConfig, stream=stream, 4643 handlers=handlers) 4644 # Issue 23207: test for invalid kwargs 4645 assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) 4646 # Should pop both filename and filemode even if filename is None 4647 logging.basicConfig(filename=None, filemode='a') 4648 4649 def test_handlers(self): 4650 handlers = [ 4651 logging.StreamHandler(), 4652 logging.StreamHandler(sys.stdout), 4653 logging.StreamHandler(), 4654 ] 4655 f = logging.Formatter() 4656 handlers[2].setFormatter(f) 4657 logging.basicConfig(handlers=handlers) 4658 self.assertIs(handlers[0], logging.root.handlers[0]) 4659 self.assertIs(handlers[1], logging.root.handlers[1]) 4660 self.assertIs(handlers[2], logging.root.handlers[2]) 4661 self.assertIsNotNone(handlers[0].formatter) 4662 self.assertIsNotNone(handlers[1].formatter) 4663 self.assertIs(handlers[2].formatter, f) 4664 self.assertIs(handlers[0].formatter, handlers[1].formatter) 4665 4666 def test_force(self): 4667 old_string_io = io.StringIO() 4668 new_string_io = io.StringIO() 4669 old_handlers = [logging.StreamHandler(old_string_io)] 4670 new_handlers = [logging.StreamHandler(new_string_io)] 4671 logging.basicConfig(level=logging.WARNING, handlers=old_handlers) 4672 logging.warning('warn') 4673 logging.info('info') 4674 logging.debug('debug') 4675 self.assertEqual(len(logging.root.handlers), 1) 4676 logging.basicConfig(level=logging.INFO, handlers=new_handlers, 4677 force=True) 4678 logging.warning('warn') 4679 logging.info('info') 4680 logging.debug('debug') 4681 self.assertEqual(len(logging.root.handlers), 1) 4682 self.assertEqual(old_string_io.getvalue().strip(), 4683 'WARNING:root:warn') 4684 self.assertEqual(new_string_io.getvalue().strip(), 4685 'WARNING:root:warn\nINFO:root:info') 4686 4687 def test_encoding(self): 4688 try: 4689 encoding = 'utf-8' 4690 logging.basicConfig(filename='test.log', encoding=encoding, 4691 errors='strict', 4692 format='%(message)s', level=logging.DEBUG) 4693 4694 self.assertEqual(len(logging.root.handlers), 1) 4695 handler = logging.root.handlers[0] 4696 self.assertIsInstance(handler, logging.FileHandler) 4697 self.assertEqual(handler.encoding, encoding) 4698 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 4699 finally: 4700 handler.close() 4701 with open('test.log', encoding='utf-8') as f: 4702 data = f.read().strip() 4703 os.remove('test.log') 4704 self.assertEqual(data, 4705 'The Øresund Bridge joins Copenhagen to Malmö') 4706 4707 def test_encoding_errors(self): 4708 try: 4709 encoding = 'ascii' 4710 logging.basicConfig(filename='test.log', encoding=encoding, 4711 errors='ignore', 4712 format='%(message)s', level=logging.DEBUG) 4713 4714 self.assertEqual(len(logging.root.handlers), 1) 4715 handler = logging.root.handlers[0] 4716 self.assertIsInstance(handler, logging.FileHandler) 4717 self.assertEqual(handler.encoding, encoding) 4718 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 4719 finally: 4720 handler.close() 4721 with open('test.log', encoding='utf-8') as f: 4722 data = f.read().strip() 4723 os.remove('test.log') 4724 self.assertEqual(data, 'The resund Bridge joins Copenhagen to Malm') 4725 4726 def test_encoding_errors_default(self): 4727 try: 4728 encoding = 'ascii' 4729 logging.basicConfig(filename='test.log', encoding=encoding, 4730 format='%(message)s', level=logging.DEBUG) 4731 4732 self.assertEqual(len(logging.root.handlers), 1) 4733 handler = logging.root.handlers[0] 4734 self.assertIsInstance(handler, logging.FileHandler) 4735 self.assertEqual(handler.encoding, encoding) 4736 self.assertEqual(handler.errors, 'backslashreplace') 4737 logging.debug('��: ☃️: The Øresund Bridge joins Copenhagen to Malmö') 4738 finally: 4739 handler.close() 4740 with open('test.log', encoding='utf-8') as f: 4741 data = f.read().strip() 4742 os.remove('test.log') 4743 self.assertEqual(data, r'\U0001f602: \u2603\ufe0f: The \xd8resund ' 4744 r'Bridge joins Copenhagen to Malm\xf6') 4745 4746 def test_encoding_errors_none(self): 4747 # Specifying None should behave as 'strict' 4748 try: 4749 encoding = 'ascii' 4750 logging.basicConfig(filename='test.log', encoding=encoding, 4751 errors=None, 4752 format='%(message)s', level=logging.DEBUG) 4753 4754 self.assertEqual(len(logging.root.handlers), 1) 4755 handler = logging.root.handlers[0] 4756 self.assertIsInstance(handler, logging.FileHandler) 4757 self.assertEqual(handler.encoding, encoding) 4758 self.assertIsNone(handler.errors) 4759 4760 message = [] 4761 4762 def dummy_handle_error(record): 4763 _, v, _ = sys.exc_info() 4764 message.append(str(v)) 4765 4766 handler.handleError = dummy_handle_error 4767 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 4768 self.assertTrue(message) 4769 self.assertIn("'ascii' codec can't encode " 4770 "character '\\xd8' in position 4:", message[0]) 4771 finally: 4772 handler.close() 4773 with open('test.log', encoding='utf-8') as f: 4774 data = f.read().strip() 4775 os.remove('test.log') 4776 # didn't write anything due to the encoding error 4777 self.assertEqual(data, r'') 4778 4779 4780 def _test_log(self, method, level=None): 4781 # logging.root has no handlers so basicConfig should be called 4782 called = [] 4783 4784 old_basic_config = logging.basicConfig 4785 def my_basic_config(*a, **kw): 4786 old_basic_config() 4787 old_level = logging.root.level 4788 logging.root.setLevel(100) # avoid having messages in stderr 4789 self.addCleanup(logging.root.setLevel, old_level) 4790 called.append((a, kw)) 4791 4792 support.patch(self, logging, 'basicConfig', my_basic_config) 4793 4794 log_method = getattr(logging, method) 4795 if level is not None: 4796 log_method(level, "test me") 4797 else: 4798 log_method("test me") 4799 4800 # basicConfig was called with no arguments 4801 self.assertEqual(called, [((), {})]) 4802 4803 def test_log(self): 4804 self._test_log('log', logging.WARNING) 4805 4806 def test_debug(self): 4807 self._test_log('debug') 4808 4809 def test_info(self): 4810 self._test_log('info') 4811 4812 def test_warning(self): 4813 self._test_log('warning') 4814 4815 def test_error(self): 4816 self._test_log('error') 4817 4818 def test_critical(self): 4819 self._test_log('critical') 4820 4821 4822class LoggerAdapterTest(unittest.TestCase): 4823 def setUp(self): 4824 super(LoggerAdapterTest, self).setUp() 4825 old_handler_list = logging._handlerList[:] 4826 4827 self.recording = RecordingHandler() 4828 self.logger = logging.root 4829 self.logger.addHandler(self.recording) 4830 self.addCleanup(self.logger.removeHandler, self.recording) 4831 self.addCleanup(self.recording.close) 4832 4833 def cleanup(): 4834 logging._handlerList[:] = old_handler_list 4835 4836 self.addCleanup(cleanup) 4837 self.addCleanup(logging.shutdown) 4838 self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None) 4839 4840 def test_exception(self): 4841 msg = 'testing exception: %r' 4842 exc = None 4843 try: 4844 1 / 0 4845 except ZeroDivisionError as e: 4846 exc = e 4847 self.adapter.exception(msg, self.recording) 4848 4849 self.assertEqual(len(self.recording.records), 1) 4850 record = self.recording.records[0] 4851 self.assertEqual(record.levelno, logging.ERROR) 4852 self.assertEqual(record.msg, msg) 4853 self.assertEqual(record.args, (self.recording,)) 4854 self.assertEqual(record.exc_info, 4855 (exc.__class__, exc, exc.__traceback__)) 4856 4857 def test_exception_excinfo(self): 4858 try: 4859 1 / 0 4860 except ZeroDivisionError as e: 4861 exc = e 4862 4863 self.adapter.exception('exc_info test', exc_info=exc) 4864 4865 self.assertEqual(len(self.recording.records), 1) 4866 record = self.recording.records[0] 4867 self.assertEqual(record.exc_info, 4868 (exc.__class__, exc, exc.__traceback__)) 4869 4870 def test_critical(self): 4871 msg = 'critical test! %r' 4872 self.adapter.critical(msg, self.recording) 4873 4874 self.assertEqual(len(self.recording.records), 1) 4875 record = self.recording.records[0] 4876 self.assertEqual(record.levelno, logging.CRITICAL) 4877 self.assertEqual(record.msg, msg) 4878 self.assertEqual(record.args, (self.recording,)) 4879 4880 def test_is_enabled_for(self): 4881 old_disable = self.adapter.logger.manager.disable 4882 self.adapter.logger.manager.disable = 33 4883 self.addCleanup(setattr, self.adapter.logger.manager, 'disable', 4884 old_disable) 4885 self.assertFalse(self.adapter.isEnabledFor(32)) 4886 4887 def test_has_handlers(self): 4888 self.assertTrue(self.adapter.hasHandlers()) 4889 4890 for handler in self.logger.handlers: 4891 self.logger.removeHandler(handler) 4892 4893 self.assertFalse(self.logger.hasHandlers()) 4894 self.assertFalse(self.adapter.hasHandlers()) 4895 4896 def test_nested(self): 4897 class Adapter(logging.LoggerAdapter): 4898 prefix = 'Adapter' 4899 4900 def process(self, msg, kwargs): 4901 return f"{self.prefix} {msg}", kwargs 4902 4903 msg = 'Adapters can be nested, yo.' 4904 adapter = Adapter(logger=self.logger, extra=None) 4905 adapter_adapter = Adapter(logger=adapter, extra=None) 4906 adapter_adapter.prefix = 'AdapterAdapter' 4907 self.assertEqual(repr(adapter), repr(adapter_adapter)) 4908 adapter_adapter.log(logging.CRITICAL, msg, self.recording) 4909 self.assertEqual(len(self.recording.records), 1) 4910 record = self.recording.records[0] 4911 self.assertEqual(record.levelno, logging.CRITICAL) 4912 self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}") 4913 self.assertEqual(record.args, (self.recording,)) 4914 orig_manager = adapter_adapter.manager 4915 self.assertIs(adapter.manager, orig_manager) 4916 self.assertIs(self.logger.manager, orig_manager) 4917 temp_manager = object() 4918 try: 4919 adapter_adapter.manager = temp_manager 4920 self.assertIs(adapter_adapter.manager, temp_manager) 4921 self.assertIs(adapter.manager, temp_manager) 4922 self.assertIs(self.logger.manager, temp_manager) 4923 finally: 4924 adapter_adapter.manager = orig_manager 4925 self.assertIs(adapter_adapter.manager, orig_manager) 4926 self.assertIs(adapter.manager, orig_manager) 4927 self.assertIs(self.logger.manager, orig_manager) 4928 4929 4930class LoggerTest(BaseTest, AssertErrorMessage): 4931 4932 def setUp(self): 4933 super(LoggerTest, self).setUp() 4934 self.recording = RecordingHandler() 4935 self.logger = logging.Logger(name='blah') 4936 self.logger.addHandler(self.recording) 4937 self.addCleanup(self.logger.removeHandler, self.recording) 4938 self.addCleanup(self.recording.close) 4939 self.addCleanup(logging.shutdown) 4940 4941 def test_set_invalid_level(self): 4942 self.assert_error_message( 4943 TypeError, 'Level not an integer or a valid string: None', 4944 self.logger.setLevel, None) 4945 self.assert_error_message( 4946 TypeError, 'Level not an integer or a valid string: (0, 0)', 4947 self.logger.setLevel, (0, 0)) 4948 4949 def test_exception(self): 4950 msg = 'testing exception: %r' 4951 exc = None 4952 try: 4953 1 / 0 4954 except ZeroDivisionError as e: 4955 exc = e 4956 self.logger.exception(msg, self.recording) 4957 4958 self.assertEqual(len(self.recording.records), 1) 4959 record = self.recording.records[0] 4960 self.assertEqual(record.levelno, logging.ERROR) 4961 self.assertEqual(record.msg, msg) 4962 self.assertEqual(record.args, (self.recording,)) 4963 self.assertEqual(record.exc_info, 4964 (exc.__class__, exc, exc.__traceback__)) 4965 4966 def test_log_invalid_level_with_raise(self): 4967 with support.swap_attr(logging, 'raiseExceptions', True): 4968 self.assertRaises(TypeError, self.logger.log, '10', 'test message') 4969 4970 def test_log_invalid_level_no_raise(self): 4971 with support.swap_attr(logging, 'raiseExceptions', False): 4972 self.logger.log('10', 'test message') # no exception happens 4973 4974 def test_find_caller_with_stack_info(self): 4975 called = [] 4976 support.patch(self, logging.traceback, 'print_stack', 4977 lambda f, file: called.append(file.getvalue())) 4978 4979 self.logger.findCaller(stack_info=True) 4980 4981 self.assertEqual(len(called), 1) 4982 self.assertEqual('Stack (most recent call last):\n', called[0]) 4983 4984 def test_find_caller_with_stacklevel(self): 4985 the_level = 1 4986 4987 def innermost(): 4988 self.logger.warning('test', stacklevel=the_level) 4989 4990 def inner(): 4991 innermost() 4992 4993 def outer(): 4994 inner() 4995 4996 records = self.recording.records 4997 outer() 4998 self.assertEqual(records[-1].funcName, 'innermost') 4999 lineno = records[-1].lineno 5000 the_level += 1 5001 outer() 5002 self.assertEqual(records[-1].funcName, 'inner') 5003 self.assertGreater(records[-1].lineno, lineno) 5004 lineno = records[-1].lineno 5005 the_level += 1 5006 outer() 5007 self.assertEqual(records[-1].funcName, 'outer') 5008 self.assertGreater(records[-1].lineno, lineno) 5009 lineno = records[-1].lineno 5010 the_level += 1 5011 outer() 5012 self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel') 5013 self.assertGreater(records[-1].lineno, lineno) 5014 5015 def test_make_record_with_extra_overwrite(self): 5016 name = 'my record' 5017 level = 13 5018 fn = lno = msg = args = exc_info = func = sinfo = None 5019 rv = logging._logRecordFactory(name, level, fn, lno, msg, args, 5020 exc_info, func, sinfo) 5021 5022 for key in ('message', 'asctime') + tuple(rv.__dict__.keys()): 5023 extra = {key: 'some value'} 5024 self.assertRaises(KeyError, self.logger.makeRecord, name, level, 5025 fn, lno, msg, args, exc_info, 5026 extra=extra, sinfo=sinfo) 5027 5028 def test_make_record_with_extra_no_overwrite(self): 5029 name = 'my record' 5030 level = 13 5031 fn = lno = msg = args = exc_info = func = sinfo = None 5032 extra = {'valid_key': 'some value'} 5033 result = self.logger.makeRecord(name, level, fn, lno, msg, args, 5034 exc_info, extra=extra, sinfo=sinfo) 5035 self.assertIn('valid_key', result.__dict__) 5036 5037 def test_has_handlers(self): 5038 self.assertTrue(self.logger.hasHandlers()) 5039 5040 for handler in self.logger.handlers: 5041 self.logger.removeHandler(handler) 5042 self.assertFalse(self.logger.hasHandlers()) 5043 5044 def test_has_handlers_no_propagate(self): 5045 child_logger = logging.getLogger('blah.child') 5046 child_logger.propagate = False 5047 self.assertFalse(child_logger.hasHandlers()) 5048 5049 def test_is_enabled_for(self): 5050 old_disable = self.logger.manager.disable 5051 self.logger.manager.disable = 23 5052 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 5053 self.assertFalse(self.logger.isEnabledFor(22)) 5054 5055 def test_is_enabled_for_disabled_logger(self): 5056 old_disabled = self.logger.disabled 5057 old_disable = self.logger.manager.disable 5058 5059 self.logger.disabled = True 5060 self.logger.manager.disable = 21 5061 5062 self.addCleanup(setattr, self.logger, 'disabled', old_disabled) 5063 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 5064 5065 self.assertFalse(self.logger.isEnabledFor(22)) 5066 5067 def test_root_logger_aliases(self): 5068 root = logging.getLogger() 5069 self.assertIs(root, logging.root) 5070 self.assertIs(root, logging.getLogger(None)) 5071 self.assertIs(root, logging.getLogger('')) 5072 self.assertIs(root, logging.getLogger('root')) 5073 self.assertIs(root, logging.getLogger('foo').root) 5074 self.assertIs(root, logging.getLogger('foo.bar').root) 5075 self.assertIs(root, logging.getLogger('foo').parent) 5076 5077 self.assertIsNot(root, logging.getLogger('\0')) 5078 self.assertIsNot(root, logging.getLogger('foo.bar').parent) 5079 5080 def test_invalid_names(self): 5081 self.assertRaises(TypeError, logging.getLogger, any) 5082 self.assertRaises(TypeError, logging.getLogger, b'foo') 5083 5084 def test_pickling(self): 5085 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 5086 for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'): 5087 logger = logging.getLogger(name) 5088 s = pickle.dumps(logger, proto) 5089 unpickled = pickle.loads(s) 5090 self.assertIs(unpickled, logger) 5091 5092 def test_caching(self): 5093 root = self.root_logger 5094 logger1 = logging.getLogger("abc") 5095 logger2 = logging.getLogger("abc.def") 5096 5097 # Set root logger level and ensure cache is empty 5098 root.setLevel(logging.ERROR) 5099 self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR) 5100 self.assertEqual(logger2._cache, {}) 5101 5102 # Ensure cache is populated and calls are consistent 5103 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 5104 self.assertFalse(logger2.isEnabledFor(logging.DEBUG)) 5105 self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False}) 5106 self.assertEqual(root._cache, {}) 5107 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 5108 5109 # Ensure root cache gets populated 5110 self.assertEqual(root._cache, {}) 5111 self.assertTrue(root.isEnabledFor(logging.ERROR)) 5112 self.assertEqual(root._cache, {logging.ERROR: True}) 5113 5114 # Set parent logger level and ensure caches are emptied 5115 logger1.setLevel(logging.CRITICAL) 5116 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 5117 self.assertEqual(logger2._cache, {}) 5118 5119 # Ensure logger2 uses parent logger's effective level 5120 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 5121 5122 # Set level to NOTSET and ensure caches are empty 5123 logger2.setLevel(logging.NOTSET) 5124 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 5125 self.assertEqual(logger2._cache, {}) 5126 self.assertEqual(logger1._cache, {}) 5127 self.assertEqual(root._cache, {}) 5128 5129 # Verify logger2 follows parent and not root 5130 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 5131 self.assertTrue(logger2.isEnabledFor(logging.CRITICAL)) 5132 self.assertFalse(logger1.isEnabledFor(logging.ERROR)) 5133 self.assertTrue(logger1.isEnabledFor(logging.CRITICAL)) 5134 self.assertTrue(root.isEnabledFor(logging.ERROR)) 5135 5136 # Disable logging in manager and ensure caches are clear 5137 logging.disable() 5138 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 5139 self.assertEqual(logger2._cache, {}) 5140 self.assertEqual(logger1._cache, {}) 5141 self.assertEqual(root._cache, {}) 5142 5143 # Ensure no loggers are enabled 5144 self.assertFalse(logger1.isEnabledFor(logging.CRITICAL)) 5145 self.assertFalse(logger2.isEnabledFor(logging.CRITICAL)) 5146 self.assertFalse(root.isEnabledFor(logging.CRITICAL)) 5147 5148 5149class BaseFileTest(BaseTest): 5150 "Base class for handler tests that write log files" 5151 5152 def setUp(self): 5153 BaseTest.setUp(self) 5154 fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") 5155 os.close(fd) 5156 self.rmfiles = [] 5157 5158 def tearDown(self): 5159 for fn in self.rmfiles: 5160 os.unlink(fn) 5161 if os.path.exists(self.fn): 5162 os.unlink(self.fn) 5163 BaseTest.tearDown(self) 5164 5165 def assertLogFile(self, filename): 5166 "Assert a log file is there and register it for deletion" 5167 self.assertTrue(os.path.exists(filename), 5168 msg="Log file %r does not exist" % filename) 5169 self.rmfiles.append(filename) 5170 5171 def next_rec(self): 5172 return logging.LogRecord('n', logging.DEBUG, 'p', 1, 5173 self.next_message(), None, None, None) 5174 5175class FileHandlerTest(BaseFileTest): 5176 def test_delay(self): 5177 os.unlink(self.fn) 5178 fh = logging.FileHandler(self.fn, encoding='utf-8', delay=True) 5179 self.assertIsNone(fh.stream) 5180 self.assertFalse(os.path.exists(self.fn)) 5181 fh.handle(logging.makeLogRecord({})) 5182 self.assertIsNotNone(fh.stream) 5183 self.assertTrue(os.path.exists(self.fn)) 5184 fh.close() 5185 5186 def test_emit_after_closing_in_write_mode(self): 5187 # Issue #42378 5188 os.unlink(self.fn) 5189 fh = logging.FileHandler(self.fn, encoding='utf-8', mode='w') 5190 fh.setFormatter(logging.Formatter('%(message)s')) 5191 fh.emit(self.next_rec()) # '1' 5192 fh.close() 5193 fh.emit(self.next_rec()) # '2' 5194 with open(self.fn) as fp: 5195 self.assertEqual(fp.read().strip(), '1') 5196 5197class RotatingFileHandlerTest(BaseFileTest): 5198 def test_should_not_rollover(self): 5199 # If maxbytes is zero rollover never occurs 5200 rh = logging.handlers.RotatingFileHandler( 5201 self.fn, encoding="utf-8", maxBytes=0) 5202 self.assertFalse(rh.shouldRollover(None)) 5203 rh.close() 5204 # bpo-45401 - test with special file 5205 # We set maxBytes to 1 so that rollover would normally happen, except 5206 # for the check for regular files 5207 rh = logging.handlers.RotatingFileHandler( 5208 os.devnull, encoding="utf-8", maxBytes=1) 5209 self.assertFalse(rh.shouldRollover(self.next_rec())) 5210 rh.close() 5211 5212 def test_should_rollover(self): 5213 rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8", maxBytes=1) 5214 self.assertTrue(rh.shouldRollover(self.next_rec())) 5215 rh.close() 5216 5217 def test_file_created(self): 5218 # checks that the file is created and assumes it was created 5219 # by us 5220 rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8") 5221 rh.emit(self.next_rec()) 5222 self.assertLogFile(self.fn) 5223 rh.close() 5224 5225 def test_rollover_filenames(self): 5226 def namer(name): 5227 return name + ".test" 5228 rh = logging.handlers.RotatingFileHandler( 5229 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 5230 rh.namer = namer 5231 rh.emit(self.next_rec()) 5232 self.assertLogFile(self.fn) 5233 rh.emit(self.next_rec()) 5234 self.assertLogFile(namer(self.fn + ".1")) 5235 rh.emit(self.next_rec()) 5236 self.assertLogFile(namer(self.fn + ".2")) 5237 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 5238 rh.close() 5239 5240 def test_namer_rotator_inheritance(self): 5241 class HandlerWithNamerAndRotator(logging.handlers.RotatingFileHandler): 5242 def namer(self, name): 5243 return name + ".test" 5244 5245 def rotator(self, source, dest): 5246 if os.path.exists(source): 5247 os.replace(source, dest + ".rotated") 5248 5249 rh = HandlerWithNamerAndRotator( 5250 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 5251 self.assertEqual(rh.namer(self.fn), self.fn + ".test") 5252 rh.emit(self.next_rec()) 5253 self.assertLogFile(self.fn) 5254 rh.emit(self.next_rec()) 5255 self.assertLogFile(rh.namer(self.fn + ".1") + ".rotated") 5256 self.assertFalse(os.path.exists(rh.namer(self.fn + ".1"))) 5257 rh.close() 5258 5259 @support.requires_zlib() 5260 def test_rotator(self): 5261 def namer(name): 5262 return name + ".gz" 5263 5264 def rotator(source, dest): 5265 with open(source, "rb") as sf: 5266 data = sf.read() 5267 compressed = zlib.compress(data, 9) 5268 with open(dest, "wb") as df: 5269 df.write(compressed) 5270 os.remove(source) 5271 5272 rh = logging.handlers.RotatingFileHandler( 5273 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 5274 rh.rotator = rotator 5275 rh.namer = namer 5276 m1 = self.next_rec() 5277 rh.emit(m1) 5278 self.assertLogFile(self.fn) 5279 m2 = self.next_rec() 5280 rh.emit(m2) 5281 fn = namer(self.fn + ".1") 5282 self.assertLogFile(fn) 5283 newline = os.linesep 5284 with open(fn, "rb") as f: 5285 compressed = f.read() 5286 data = zlib.decompress(compressed) 5287 self.assertEqual(data.decode("ascii"), m1.msg + newline) 5288 rh.emit(self.next_rec()) 5289 fn = namer(self.fn + ".2") 5290 self.assertLogFile(fn) 5291 with open(fn, "rb") as f: 5292 compressed = f.read() 5293 data = zlib.decompress(compressed) 5294 self.assertEqual(data.decode("ascii"), m1.msg + newline) 5295 rh.emit(self.next_rec()) 5296 fn = namer(self.fn + ".2") 5297 with open(fn, "rb") as f: 5298 compressed = f.read() 5299 data = zlib.decompress(compressed) 5300 self.assertEqual(data.decode("ascii"), m2.msg + newline) 5301 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 5302 rh.close() 5303 5304class TimedRotatingFileHandlerTest(BaseFileTest): 5305 def test_should_not_rollover(self): 5306 # See bpo-45401. Should only ever rollover regular files 5307 fh = logging.handlers.TimedRotatingFileHandler( 5308 os.devnull, 'S', encoding="utf-8", backupCount=1) 5309 time.sleep(1.1) # a little over a second ... 5310 r = logging.makeLogRecord({'msg': 'testing - device file'}) 5311 self.assertFalse(fh.shouldRollover(r)) 5312 fh.close() 5313 5314 # other test methods added below 5315 def test_rollover(self): 5316 fh = logging.handlers.TimedRotatingFileHandler( 5317 self.fn, 'S', encoding="utf-8", backupCount=1) 5318 fmt = logging.Formatter('%(asctime)s %(message)s') 5319 fh.setFormatter(fmt) 5320 r1 = logging.makeLogRecord({'msg': 'testing - initial'}) 5321 fh.emit(r1) 5322 self.assertLogFile(self.fn) 5323 time.sleep(1.1) # a little over a second ... 5324 r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) 5325 fh.emit(r2) 5326 fh.close() 5327 # At this point, we should have a recent rotated file which we 5328 # can test for the existence of. However, in practice, on some 5329 # machines which run really slowly, we don't know how far back 5330 # in time to go to look for the log file. So, we go back a fair 5331 # bit, and stop as soon as we see a rotated file. In theory this 5332 # could of course still fail, but the chances are lower. 5333 found = False 5334 now = datetime.datetime.now() 5335 GO_BACK = 5 * 60 # seconds 5336 for secs in range(GO_BACK): 5337 prev = now - datetime.timedelta(seconds=secs) 5338 fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") 5339 found = os.path.exists(fn) 5340 if found: 5341 self.rmfiles.append(fn) 5342 break 5343 msg = 'No rotated files found, went back %d seconds' % GO_BACK 5344 if not found: 5345 # print additional diagnostics 5346 dn, fn = os.path.split(self.fn) 5347 files = [f for f in os.listdir(dn) if f.startswith(fn)] 5348 print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) 5349 print('The only matching files are: %s' % files, file=sys.stderr) 5350 for f in files: 5351 print('Contents of %s:' % f) 5352 path = os.path.join(dn, f) 5353 with open(path, 'r') as tf: 5354 print(tf.read()) 5355 self.assertTrue(found, msg=msg) 5356 5357 def test_invalid(self): 5358 assertRaises = self.assertRaises 5359 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5360 self.fn, 'X', encoding="utf-8", delay=True) 5361 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5362 self.fn, 'W', encoding="utf-8", delay=True) 5363 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5364 self.fn, 'W7', encoding="utf-8", delay=True) 5365 5366 def test_compute_rollover_daily_attime(self): 5367 currentTime = 0 5368 atTime = datetime.time(12, 0, 0) 5369 rh = logging.handlers.TimedRotatingFileHandler( 5370 self.fn, encoding="utf-8", when='MIDNIGHT', interval=1, backupCount=0, 5371 utc=True, atTime=atTime) 5372 try: 5373 actual = rh.computeRollover(currentTime) 5374 self.assertEqual(actual, currentTime + 12 * 60 * 60) 5375 5376 actual = rh.computeRollover(currentTime + 13 * 60 * 60) 5377 self.assertEqual(actual, currentTime + 36 * 60 * 60) 5378 finally: 5379 rh.close() 5380 5381 #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.') 5382 def test_compute_rollover_weekly_attime(self): 5383 currentTime = int(time.time()) 5384 today = currentTime - currentTime % 86400 5385 5386 atTime = datetime.time(12, 0, 0) 5387 5388 wday = time.gmtime(today).tm_wday 5389 for day in range(7): 5390 rh = logging.handlers.TimedRotatingFileHandler( 5391 self.fn, encoding="utf-8", when='W%d' % day, interval=1, backupCount=0, 5392 utc=True, atTime=atTime) 5393 try: 5394 if wday > day: 5395 # The rollover day has already passed this week, so we 5396 # go over into next week 5397 expected = (7 - wday + day) 5398 else: 5399 expected = (day - wday) 5400 # At this point expected is in days from now, convert to seconds 5401 expected *= 24 * 60 * 60 5402 # Add in the rollover time 5403 expected += 12 * 60 * 60 5404 # Add in adjustment for today 5405 expected += today 5406 actual = rh.computeRollover(today) 5407 if actual != expected: 5408 print('failed in timezone: %d' % time.timezone) 5409 print('local vars: %s' % locals()) 5410 self.assertEqual(actual, expected) 5411 if day == wday: 5412 # goes into following week 5413 expected += 7 * 24 * 60 * 60 5414 actual = rh.computeRollover(today + 13 * 60 * 60) 5415 if actual != expected: 5416 print('failed in timezone: %d' % time.timezone) 5417 print('local vars: %s' % locals()) 5418 self.assertEqual(actual, expected) 5419 finally: 5420 rh.close() 5421 5422 def test_compute_files_to_delete(self): 5423 # See bpo-46063 for background 5424 wd = tempfile.mkdtemp(prefix='test_logging_') 5425 self.addCleanup(shutil.rmtree, wd) 5426 times = [] 5427 dt = datetime.datetime.now() 5428 for i in range(10): 5429 times.append(dt.strftime('%Y-%m-%d_%H-%M-%S')) 5430 dt += datetime.timedelta(seconds=5) 5431 prefixes = ('a.b', 'a.b.c', 'd.e', 'd.e.f') 5432 files = [] 5433 rotators = [] 5434 for prefix in prefixes: 5435 p = os.path.join(wd, '%s.log' % prefix) 5436 rotator = logging.handlers.TimedRotatingFileHandler(p, when='s', 5437 interval=5, 5438 backupCount=7, 5439 delay=True) 5440 rotators.append(rotator) 5441 if prefix.startswith('a.b'): 5442 for t in times: 5443 files.append('%s.log.%s' % (prefix, t)) 5444 else: 5445 rotator.namer = lambda name: name.replace('.log', '') + '.log' 5446 for t in times: 5447 files.append('%s.%s.log' % (prefix, t)) 5448 # Create empty files 5449 for fn in files: 5450 p = os.path.join(wd, fn) 5451 with open(p, 'wb') as f: 5452 pass 5453 # Now the checks that only the correct files are offered up for deletion 5454 for i, prefix in enumerate(prefixes): 5455 rotator = rotators[i] 5456 candidates = rotator.getFilesToDelete() 5457 self.assertEqual(len(candidates), 3) 5458 if prefix.startswith('a.b'): 5459 p = '%s.log.' % prefix 5460 for c in candidates: 5461 d, fn = os.path.split(c) 5462 self.assertTrue(fn.startswith(p)) 5463 else: 5464 for c in candidates: 5465 d, fn = os.path.split(c) 5466 self.assertTrue(fn.endswith('.log')) 5467 self.assertTrue(fn.startswith(prefix + '.') and 5468 fn[len(prefix) + 2].isdigit()) 5469 5470 5471def secs(**kw): 5472 return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) 5473 5474for when, exp in (('S', 1), 5475 ('M', 60), 5476 ('H', 60 * 60), 5477 ('D', 60 * 60 * 24), 5478 ('MIDNIGHT', 60 * 60 * 24), 5479 # current time (epoch start) is a Thursday, W0 means Monday 5480 ('W0', secs(days=4, hours=24)), 5481 ): 5482 def test_compute_rollover(self, when=when, exp=exp): 5483 rh = logging.handlers.TimedRotatingFileHandler( 5484 self.fn, encoding="utf-8", when=when, interval=1, backupCount=0, utc=True) 5485 currentTime = 0.0 5486 actual = rh.computeRollover(currentTime) 5487 if exp != actual: 5488 # Failures occur on some systems for MIDNIGHT and W0. 5489 # Print detailed calculation for MIDNIGHT so we can try to see 5490 # what's going on 5491 if when == 'MIDNIGHT': 5492 try: 5493 if rh.utc: 5494 t = time.gmtime(currentTime) 5495 else: 5496 t = time.localtime(currentTime) 5497 currentHour = t[3] 5498 currentMinute = t[4] 5499 currentSecond = t[5] 5500 # r is the number of seconds left between now and midnight 5501 r = logging.handlers._MIDNIGHT - ((currentHour * 60 + 5502 currentMinute) * 60 + 5503 currentSecond) 5504 result = currentTime + r 5505 print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) 5506 print('currentHour: %s' % currentHour, file=sys.stderr) 5507 print('currentMinute: %s' % currentMinute, file=sys.stderr) 5508 print('currentSecond: %s' % currentSecond, file=sys.stderr) 5509 print('r: %s' % r, file=sys.stderr) 5510 print('result: %s' % result, file=sys.stderr) 5511 except Exception: 5512 print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr) 5513 self.assertEqual(exp, actual) 5514 rh.close() 5515 setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) 5516 5517 5518@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.') 5519class NTEventLogHandlerTest(BaseTest): 5520 def test_basic(self): 5521 logtype = 'Application' 5522 elh = win32evtlog.OpenEventLog(None, logtype) 5523 num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) 5524 5525 try: 5526 h = logging.handlers.NTEventLogHandler('test_logging') 5527 except pywintypes.error as e: 5528 if e.winerror == 5: # access denied 5529 raise unittest.SkipTest('Insufficient privileges to run test') 5530 raise 5531 5532 r = logging.makeLogRecord({'msg': 'Test Log Message'}) 5533 h.handle(r) 5534 h.close() 5535 # Now see if the event is recorded 5536 self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) 5537 flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ 5538 win32evtlog.EVENTLOG_SEQUENTIAL_READ 5539 found = False 5540 GO_BACK = 100 5541 events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) 5542 for e in events: 5543 if e.SourceName != 'test_logging': 5544 continue 5545 msg = win32evtlogutil.SafeFormatMessage(e, logtype) 5546 if msg != 'Test Log Message\r\n': 5547 continue 5548 found = True 5549 break 5550 msg = 'Record not found in event log, went back %d records' % GO_BACK 5551 self.assertTrue(found, msg=msg) 5552 5553 5554class MiscTestCase(unittest.TestCase): 5555 def test__all__(self): 5556 not_exported = { 5557 'logThreads', 'logMultiprocessing', 'logProcesses', 'currentframe', 5558 'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle', 5559 'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 'root', 5560 'threading'} 5561 support.check__all__(self, logging, not_exported=not_exported) 5562 5563 5564# Set the locale to the platform-dependent default. I have no idea 5565# why the test does this, but in any case we save the current locale 5566# first and restore it at the end. 5567def setUpModule(): 5568 cm = support.run_with_locale('LC_ALL', '') 5569 cm.__enter__() 5570 unittest.addModuleCleanup(cm.__exit__, None, None, None) 5571 5572 5573if __name__ == "__main__": 5574 unittest.main() 5575