diff -r ffa851df0825 -r 2fb8b9db1c86 symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_logging.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_logging.py Fri Jul 31 15:01:17 2009 +0100 @@ -0,0 +1,899 @@ +#!/usr/bin/env python +# +# Copyright 2001-2004 by Vinay Sajip. All Rights Reserved. +# +# Permission to use, copy, modify, and distribute this software and its +# documentation for any purpose and without fee is hereby granted, +# provided that the above copyright notice appear in all copies and that +# both that copyright notice and this permission notice appear in +# supporting documentation, and that the name of Vinay Sajip +# not be used in advertising or publicity pertaining to distribution +# of the software without specific, written prior permission. +# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING +# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL +# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR +# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER +# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT +# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +"""Test harness for the logging module. Run all tests. + +Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved. +""" + +import logging +import logging.handlers +import logging.config + +import copy +import cPickle +import cStringIO +import gc +import os +import re +import select +import socket +from SocketServer import ThreadingTCPServer, StreamRequestHandler +import string +import struct +import sys +import tempfile +from test.test_support import captured_stdout, run_with_locale, run_unittest +import textwrap +import threading +import time +import types +import unittest +import weakref + + +class BaseTest(unittest.TestCase): + + """Base class for logging tests.""" + + log_format = "%(name)s -> %(levelname)s: %(message)s" + expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" + message_num = 0 + + def setUp(self): + """Setup the default logging stream to an internal StringIO instance, + so that we can examine log output as we want.""" + logger_dict = logging.getLogger().manager.loggerDict + logging._acquireLock() + try: + self.saved_handlers = logging._handlers.copy() + self.saved_handler_list = logging._handlerList[:] + self.saved_loggers = logger_dict.copy() + self.saved_level_names = logging._levelNames.copy() + finally: + logging._releaseLock() + + self.root_logger = logging.getLogger("") + self.original_logging_level = self.root_logger.getEffectiveLevel() + + self.stream = cStringIO.StringIO() + self.root_logger.setLevel(logging.DEBUG) + self.root_hdlr = logging.StreamHandler(self.stream) + self.root_formatter = logging.Formatter(self.log_format) + self.root_hdlr.setFormatter(self.root_formatter) + self.root_logger.addHandler(self.root_hdlr) + + def tearDown(self): + """Remove our logging stream, and restore the original logging + level.""" + self.stream.close() + self.root_logger.removeHandler(self.root_hdlr) + self.root_logger.setLevel(self.original_logging_level) + logging._acquireLock() + try: + logging._levelNames.clear() + logging._levelNames.update(self.saved_level_names) + logging._handlers.clear() + logging._handlers.update(self.saved_handlers) + logging._handlerList[:] = self.saved_handler_list + loggerDict = logging.getLogger().manager.loggerDict + loggerDict.clear() + loggerDict.update(self.saved_loggers) + finally: + logging._releaseLock() + + def assert_log_lines(self, expected_values, stream=None): + """Match the collected log lines against the regular expression + self.expected_log_pat, and compare the extracted group values to + the expected_values list of tuples.""" + stream = stream or self.stream + pat = re.compile(self.expected_log_pat) + try: + stream.reset() + actual_lines = stream.readlines() + except AttributeError: + # StringIO.StringIO lacks a reset() method. + actual_lines = stream.getvalue().splitlines() + self.assertEquals(len(actual_lines), len(expected_values)) + for actual, expected in zip(actual_lines, expected_values): + match = pat.search(actual) + if not match: + self.fail("Log line does not match expected pattern:\n" + + actual) + self.assertEquals(tuple(match.groups()), expected) + s = stream.read() + if s: + self.fail("Remaining output at end of log stream:\n" + s) + + def next_message(self): + """Generate a message consisting solely of an auto-incrementing + integer.""" + self.message_num += 1 + return "%d" % self.message_num + + +class BuiltinLevelsTest(BaseTest): + """Test builtin levels and their inheritance.""" + + def test_flat(self): + #Logging levels in a flat logger namespace. + m = self.next_message + + ERR = logging.getLogger("ERR") + ERR.setLevel(logging.ERROR) + INF = logging.getLogger("INF") + INF.setLevel(logging.INFO) + DEB = logging.getLogger("DEB") + DEB.setLevel(logging.DEBUG) + + # These should log. + ERR.log(logging.CRITICAL, m()) + ERR.error(m()) + + INF.log(logging.CRITICAL, m()) + INF.error(m()) + INF.warn(m()) + INF.info(m()) + + DEB.log(logging.CRITICAL, m()) + DEB.error(m()) + DEB.warn (m()) + DEB.info (m()) + DEB.debug(m()) + + # These should not log. + ERR.warn(m()) + ERR.info(m()) + ERR.debug(m()) + + INF.debug(m()) + + self.assert_log_lines([ + ('ERR', 'CRITICAL', '1'), + ('ERR', 'ERROR', '2'), + ('INF', 'CRITICAL', '3'), + ('INF', 'ERROR', '4'), + ('INF', 'WARNING', '5'), + ('INF', 'INFO', '6'), + ('DEB', 'CRITICAL', '7'), + ('DEB', 'ERROR', '8'), + ('DEB', 'WARNING', '9'), + ('DEB', 'INFO', '10'), + ('DEB', 'DEBUG', '11'), + ]) + + def test_nested_explicit(self): + # Logging levels in a nested namespace, all explicitly set. + m = self.next_message + + INF = logging.getLogger("INF") + INF.setLevel(logging.INFO) + INF_ERR = logging.getLogger("INF.ERR") + INF_ERR.setLevel(logging.ERROR) + + # These should log. + INF_ERR.log(logging.CRITICAL, m()) + INF_ERR.error(m()) + + # These should not log. + INF_ERR.warn(m()) + INF_ERR.info(m()) + INF_ERR.debug(m()) + + self.assert_log_lines([ + ('INF.ERR', 'CRITICAL', '1'), + ('INF.ERR', 'ERROR', '2'), + ]) + + def test_nested_inherited(self): + #Logging levels in a nested namespace, inherited from parent loggers. + m = self.next_message + + INF = logging.getLogger("INF") + INF.setLevel(logging.INFO) + INF_ERR = logging.getLogger("INF.ERR") + INF_ERR.setLevel(logging.ERROR) + INF_UNDEF = logging.getLogger("INF.UNDEF") + INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") + UNDEF = logging.getLogger("UNDEF") + + # These should log. + INF_UNDEF.log(logging.CRITICAL, m()) + INF_UNDEF.error(m()) + INF_UNDEF.warn(m()) + INF_UNDEF.info(m()) + INF_ERR_UNDEF.log(logging.CRITICAL, m()) + INF_ERR_UNDEF.error(m()) + + # These should not log. + INF_UNDEF.debug(m()) + INF_ERR_UNDEF.warn(m()) + INF_ERR_UNDEF.info(m()) + INF_ERR_UNDEF.debug(m()) + + self.assert_log_lines([ + ('INF.UNDEF', 'CRITICAL', '1'), + ('INF.UNDEF', 'ERROR', '2'), + ('INF.UNDEF', 'WARNING', '3'), + ('INF.UNDEF', 'INFO', '4'), + ('INF.ERR.UNDEF', 'CRITICAL', '5'), + ('INF.ERR.UNDEF', 'ERROR', '6'), + ]) + + def test_nested_with_virtual_parent(self): + # Logging levels when some parent does not exist yet. + m = self.next_message + + INF = logging.getLogger("INF") + GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") + CHILD = logging.getLogger("INF.BADPARENT") + INF.setLevel(logging.INFO) + + # These should log. + GRANDCHILD.log(logging.FATAL, m()) + GRANDCHILD.info(m()) + CHILD.log(logging.FATAL, m()) + CHILD.info(m()) + + # These should not log. + GRANDCHILD.debug(m()) + CHILD.debug(m()) + + self.assert_log_lines([ + ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), + ('INF.BADPARENT.UNDEF', 'INFO', '2'), + ('INF.BADPARENT', 'CRITICAL', '3'), + ('INF.BADPARENT', 'INFO', '4'), + ]) + + +class BasicFilterTest(BaseTest): + + """Test the bundled Filter class.""" + + def test_filter(self): + # Only messages satisfying the specified criteria pass through the + # filter. + filter_ = logging.Filter("spam.eggs") + handler = self.root_logger.handlers[0] + try: + handler.addFilter(filter_) + spam = logging.getLogger("spam") + spam_eggs = logging.getLogger("spam.eggs") + spam_eggs_fish = logging.getLogger("spam.eggs.fish") + spam_bakedbeans = logging.getLogger("spam.bakedbeans") + + spam.info(self.next_message()) + spam_eggs.info(self.next_message()) # Good. + spam_eggs_fish.info(self.next_message()) # Good. + spam_bakedbeans.info(self.next_message()) + + self.assert_log_lines([ + ('spam.eggs', 'INFO', '2'), + ('spam.eggs.fish', 'INFO', '3'), + ]) + finally: + handler.removeFilter(filter_) + + +# +# First, we define our levels. There can be as many as you want - the only +# limitations are that they should be integers, the lowest should be > 0 and +# larger values mean less information being logged. If you need specific +# level values which do not fit into these limitations, you can use a +# mapping dictionary to convert between your application levels and the +# logging system. +# +SILENT = 120 +TACITURN = 119 +TERSE = 118 +EFFUSIVE = 117 +SOCIABLE = 116 +VERBOSE = 115 +TALKATIVE = 114 +GARRULOUS = 113 +CHATTERBOX = 112 +BORING = 111 + +LEVEL_RANGE = range(BORING, SILENT + 1) + +# +# Next, we define names for our levels. You don't need to do this - in which +# case the system will use "Level n" to denote the text for the level. +# +my_logging_levels = { + SILENT : 'Silent', + TACITURN : 'Taciturn', + TERSE : 'Terse', + EFFUSIVE : 'Effusive', + SOCIABLE : 'Sociable', + VERBOSE : 'Verbose', + TALKATIVE : 'Talkative', + GARRULOUS : 'Garrulous', + CHATTERBOX : 'Chatterbox', + BORING : 'Boring', +} + +class GarrulousFilter(logging.Filter): + + """A filter which blocks garrulous messages.""" + + def filter(self, record): + return record.levelno != GARRULOUS + +class VerySpecificFilter(logging.Filter): + + """A filter which blocks sociable and taciturn messages.""" + + def filter(self, record): + return record.levelno not in [SOCIABLE, TACITURN] + + +class CustomLevelsAndFiltersTest(BaseTest): + + """Test various filtering possibilities with custom logging levels.""" + + # Skip the logger name group. + expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + + def setUp(self): + BaseTest.setUp(self) + for k, v in my_logging_levels.items(): + logging.addLevelName(k, v) + + def log_at_all_levels(self, logger): + for lvl in LEVEL_RANGE: + logger.log(lvl, self.next_message()) + + def test_logger_filter(self): + # Filter at logger level. + self.root_logger.setLevel(VERBOSE) + # Levels >= 'Verbose' are good. + self.log_at_all_levels(self.root_logger) + self.assert_log_lines([ + ('Verbose', '5'), + ('Sociable', '6'), + ('Effusive', '7'), + ('Terse', '8'), + ('Taciturn', '9'), + ('Silent', '10'), + ]) + + def test_handler_filter(self): + # Filter at handler level. + self.root_logger.handlers[0].setLevel(SOCIABLE) + try: + # Levels >= 'Sociable' are good. + self.log_at_all_levels(self.root_logger) + self.assert_log_lines([ + ('Sociable', '6'), + ('Effusive', '7'), + ('Terse', '8'), + ('Taciturn', '9'), + ('Silent', '10'), + ]) + finally: + self.root_logger.handlers[0].setLevel(logging.NOTSET) + + def test_specific_filters(self): + # Set a specific filter object on the handler, and then add another + # filter object on the logger itself. + handler = self.root_logger.handlers[0] + specific_filter = None + garr = GarrulousFilter() + handler.addFilter(garr) + try: + self.log_at_all_levels(self.root_logger) + first_lines = [ + # Notice how 'Garrulous' is missing + ('Boring', '1'), + ('Chatterbox', '2'), + ('Talkative', '4'), + ('Verbose', '5'), + ('Sociable', '6'), + ('Effusive', '7'), + ('Terse', '8'), + ('Taciturn', '9'), + ('Silent', '10'), + ] + self.assert_log_lines(first_lines) + + specific_filter = VerySpecificFilter() + self.root_logger.addFilter(specific_filter) + self.log_at_all_levels(self.root_logger) + self.assert_log_lines(first_lines + [ + # Not only 'Garrulous' is still missing, but also 'Sociable' + # and 'Taciturn' + ('Boring', '11'), + ('Chatterbox', '12'), + ('Talkative', '14'), + ('Verbose', '15'), + ('Effusive', '17'), + ('Terse', '18'), + ('Silent', '20'), + ]) + finally: + if specific_filter: + self.root_logger.removeFilter(specific_filter) + handler.removeFilter(garr) + + +class MemoryHandlerTest(BaseTest): + + """Tests for the MemoryHandler.""" + + # Do not bother with a logger name group. + expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + + def setUp(self): + BaseTest.setUp(self) + self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, + self.root_hdlr) + self.mem_logger = logging.getLogger('mem') + self.mem_logger.propagate = 0 + self.mem_logger.addHandler(self.mem_hdlr) + + def tearDown(self): + self.mem_hdlr.close() + BaseTest.tearDown(self) + + def test_flush(self): + # The memory handler flushes to its target handler based on specific + # criteria (message count and message level). + self.mem_logger.debug(self.next_message()) + self.assert_log_lines([]) + self.mem_logger.info(self.next_message()) + self.assert_log_lines([]) + # This will flush because the level is >= logging.WARNING + self.mem_logger.warn(self.next_message()) + lines = [ + ('DEBUG', '1'), + ('INFO', '2'), + ('WARNING', '3'), + ] + self.assert_log_lines(lines) + for n in (4, 14): + for i in range(9): + self.mem_logger.debug(self.next_message()) + self.assert_log_lines(lines) + # This will flush because it's the 10th message since the last + # flush. + self.mem_logger.debug(self.next_message()) + lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] + self.assert_log_lines(lines) + + self.mem_logger.debug(self.next_message()) + self.assert_log_lines(lines) + + +class ExceptionFormatter(logging.Formatter): + """A special exception formatter.""" + def formatException(self, ei): + return "Got a [%s]" % ei[0].__name__ + + +class ConfigFileTest(BaseTest): + + """Reading logging config from a .ini-style config file.""" + + expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" + + # config0 is a standard configuration. + config0 = """ + [loggers] + keys=root + + [handlers] + keys=hand1 + + [formatters] + keys=form1 + + [logger_root] + level=WARNING + handlers=hand1 + + [handler_hand1] + class=StreamHandler + level=NOTSET + formatter=form1 + args=(sys.stdout,) + + [formatter_form1] + format=%(levelname)s ++ %(message)s + datefmt= + """ + + # config1 adds a little to the standard configuration. + config1 = """ + [loggers] + keys=root,parser + + [handlers] + keys=hand1 + + [formatters] + keys=form1 + + [logger_root] + level=WARNING + handlers= + + [logger_parser] + level=DEBUG + handlers=hand1 + propagate=1 + qualname=compiler.parser + + [handler_hand1] + class=StreamHandler + level=NOTSET + formatter=form1 + args=(sys.stdout,) + + [formatter_form1] + format=%(levelname)s ++ %(message)s + datefmt= + """ + + # config2 has a subtle configuration error that should be reported + config2 = config1.replace("sys.stdout", "sys.stbout") + + # config3 has a less subtle configuration error + config3 = config1.replace("formatter=form1", "formatter=misspelled_name") + + # config4 specifies a custom formatter class to be loaded + config4 = """ + [loggers] + keys=root + + [handlers] + keys=hand1 + + [formatters] + keys=form1 + + [logger_root] + level=NOTSET + handlers=hand1 + + [handler_hand1] + class=StreamHandler + level=NOTSET + formatter=form1 + args=(sys.stdout,) + + [formatter_form1] + class=""" + __name__ + """.ExceptionFormatter + format=%(levelname)s:%(name)s:%(message)s + datefmt= + """ + + # config5 specifies a custom handler class to be loaded + config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') + + # config6 uses ', ' delimiters in the handlers and formatters sections + config6 = """ + [loggers] + keys=root,parser + + [handlers] + keys=hand1, hand2 + + [formatters] + keys=form1, form2 + + [logger_root] + level=WARNING + handlers= + + [logger_parser] + level=DEBUG + handlers=hand1 + propagate=1 + qualname=compiler.parser + + [handler_hand1] + class=StreamHandler + level=NOTSET + formatter=form1 + args=(sys.stdout,) + + [handler_hand2] + class=StreamHandler + level=NOTSET + formatter=form1 + args=(sys.stderr,) + + [formatter_form1] + format=%(levelname)s ++ %(message)s + datefmt= + + [formatter_form2] + format=%(message)s + datefmt= + """ + + def apply_config(self, conf): + try: + fn = tempfile.mktemp(".ini") + f = open(fn, "w") + f.write(textwrap.dedent(conf)) + f.close() + logging.config.fileConfig(fn) + finally: + os.remove(fn) + + def test_config0_ok(self): + # A simple config file which overrides the default settings. + with captured_stdout() as output: + self.apply_config(self.config0) + logger = logging.getLogger() + # Won't output anything + logger.info(self.next_message()) + # Outputs a message + logger.error(self.next_message()) + self.assert_log_lines([ + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + + def test_config1_ok(self, config=config1): + # A config file defining a sub-parser as well. + with captured_stdout() as output: + self.apply_config(config) + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + + def test_config2_failure(self): + # A simple config file which overrides the default settings. + self.assertRaises(StandardError, self.apply_config, self.config2) + + def test_config3_failure(self): + # A simple config file which overrides the default settings. + self.assertRaises(StandardError, self.apply_config, self.config3) + + def test_config4_ok(self): + # A config file specifying a custom formatter class. + with captured_stdout() as output: + self.apply_config(self.config4) + logger = logging.getLogger() + try: + raise RuntimeError() + except RuntimeError: + logging.exception("just testing") + sys.stdout.seek(0) + self.assertEquals(output.getvalue(), + "ERROR:root:just testing\nGot a [RuntimeError]\n") + # Original logger output is empty + self.assert_log_lines([]) + + def test_config5_ok(self): + self.test_config1_ok(config=self.config5) + + def test_config6_ok(self): + self.test_config1_ok(config=self.config6) + +class LogRecordStreamHandler(StreamRequestHandler): + + """Handler for a streaming logging request. It saves the log message in the + TCP server's 'log_output' attribute.""" + + TCP_LOG_END = "!!!END!!!" + + def handle(self): + """Handle multiple requests - each expected to be of 4-byte length, + followed by the LogRecord in pickle format. Logs the record + according to whatever policy is configured locally.""" + while True: + chunk = self.connection.recv(4) + if len(chunk) < 4: + break + slen = struct.unpack(">L", chunk)[0] + chunk = self.connection.recv(slen) + while len(chunk) < slen: + chunk = chunk + self.connection.recv(slen - len(chunk)) + obj = self.unpickle(chunk) + record = logging.makeLogRecord(obj) + self.handle_log_record(record) + + def unpickle(self, data): + return cPickle.loads(data) + + def handle_log_record(self, record): + # If the end-of-messages sentinel is seen, tell the server to + # terminate. + if self.TCP_LOG_END in record.msg: + self.server.abort = 1 + return + self.server.log_output += record.msg + "\n" + + +class LogRecordSocketReceiver(ThreadingTCPServer): + + """A simple-minded TCP socket-based logging receiver suitable for test + purposes.""" + + allow_reuse_address = 1 + log_output = "" + + def __init__(self, host='localhost', + port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, + handler=LogRecordStreamHandler): + ThreadingTCPServer.__init__(self, (host, port), handler) + self.abort = False + self.timeout = 0.1 + self.finished = threading.Event() + + def serve_until_stopped(self): + while not self.abort: + rd, wr, ex = select.select([self.socket.fileno()], [], [], + self.timeout) + if rd: + self.handle_request() + # Notify the main thread that we're about to exit + self.finished.set() + # close the listen socket + self.server_close() + + +class SocketHandlerTest(BaseTest): + + """Test for SocketHandler objects.""" + + def setUp(self): + """Set up a TCP server to receive log messages, and a SocketHandler + pointing to that server's address and port.""" + BaseTest.setUp(self) + self.tcpserver = LogRecordSocketReceiver(port=0) + self.port = self.tcpserver.socket.getsockname()[1] + self.threads = [ + threading.Thread(target=self.tcpserver.serve_until_stopped)] + for thread in self.threads: + thread.start() + + self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port) + self.sock_hdlr.setFormatter(self.root_formatter) + self.root_logger.removeHandler(self.root_logger.handlers[0]) + self.root_logger.addHandler(self.sock_hdlr) + + def tearDown(self): + """Shutdown the TCP server.""" + try: + self.tcpserver.abort = True + del self.tcpserver + self.root_logger.removeHandler(self.sock_hdlr) + self.sock_hdlr.close() + for thread in self.threads: + thread.join(2.0) + finally: + BaseTest.tearDown(self) + + def get_output(self): + """Get the log output as received by the TCP server.""" + # Signal the TCP receiver and wait for it to terminate. + self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END) + self.tcpserver.finished.wait(2.0) + return self.tcpserver.log_output + + def test_output(self): + # The log message sent to the SocketHandler is properly received. + logger = logging.getLogger("tcp") + logger.error("spam") + logger.debug("eggs") + self.assertEquals(self.get_output(), "spam\neggs\n") + + +class MemoryTest(BaseTest): + + """Test memory persistence of logger objects.""" + + def setUp(self): + """Create a dict to remember potentially destroyed objects.""" + BaseTest.setUp(self) + self._survivors = {} + + def _watch_for_survival(self, *args): + """Watch the given objects for survival, by creating weakrefs to + them.""" + for obj in args: + key = id(obj), repr(obj) + self._survivors[key] = weakref.ref(obj) + + def _assert_survival(self): + """Assert that all objects watched for survival have survived.""" + # Trigger cycle breaking. + gc.collect() + dead = [] + for (id_, repr_), ref in self._survivors.items(): + if ref() is None: + dead.append(repr_) + if dead: + self.fail("%d objects should have survived " + "but have been destroyed: %s" % (len(dead), ", ".join(dead))) + + def test_persistent_loggers(self): + # Logger objects are persistent and retain their configuration, even + # if visible references are destroyed. + self.root_logger.setLevel(logging.INFO) + foo = logging.getLogger("foo") + self._watch_for_survival(foo) + foo.setLevel(logging.DEBUG) + self.root_logger.debug(self.next_message()) + foo.debug(self.next_message()) + self.assert_log_lines([ + ('foo', 'DEBUG', '2'), + ]) + del foo + # foo has survived. + self._assert_survival() + # foo has retained its settings. + bar = logging.getLogger("foo") + bar.debug(self.next_message()) + self.assert_log_lines([ + ('foo', 'DEBUG', '2'), + ('foo', 'DEBUG', '3'), + ]) + +class EncodingTest(BaseTest): + def test_encoding_plain_file(self): + # In Python 2.x, a plain file object is treated as having no encoding. + log = logging.getLogger("test") + fn = tempfile.mktemp(".log") + # the non-ascii data we write to the log. + data = "foo\x80" + try: + handler = logging.FileHandler(fn) + log.addHandler(handler) + try: + # write non-ascii data to the log. + log.warning(data) + finally: + log.removeHandler(handler) + handler.close() + # check we wrote exactly those bytes, ignoring trailing \n etc + f = open(fn) + try: + self.failUnlessEqual(f.read().rstrip(), data) + finally: + f.close() + finally: + if os.path.isfile(fn): + os.remove(fn) + +# Set the locale to the platform-dependent default. I have no idea +# why the test does this, but in any case we save the current locale +# first and restore it at the end. +@run_with_locale('LC_ALL', '') +def test_main(): + run_unittest(BuiltinLevelsTest, BasicFilterTest, + CustomLevelsAndFiltersTest, MemoryHandlerTest, + ConfigFileTest, SocketHandlerTest, MemoryTest, + EncodingTest) + +if __name__ == "__main__": + test_main()