diff -r 000000000000 -r ae805ac0140d python-2.5.2/win32/Lib/test/test_logging.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python-2.5.2/win32/Lib/test/test_logging.py Fri Apr 03 17:19:34 2009 +0100 @@ -0,0 +1,685 @@ +#!/usr/bin/env python +# +# Copyright 2001-2004 by Vinay Sajip. All Rights Reserved. +# +# Permission to use, copy, modify, and distribute this software and its +# documentation for any purpose and without fee is hereby granted, +# provided that the above copyright notice appear in all copies and that +# both that copyright notice and this permission notice appear in +# supporting documentation, and that the name of Vinay Sajip +# not be used in advertising or publicity pertaining to distribution +# of the software without specific, written prior permission. +# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING +# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL +# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR +# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER +# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT +# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# +# This file is part of the Python logging distribution. See +# http://www.red-dove.com/python_logging.html +# +"""Test harness for the logging module. Run all tests. + +Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved. +""" + +import select +import os, sys, string, struct, types, cPickle, cStringIO +import socket, tempfile, threading, time +import logging, logging.handlers, logging.config +from test.test_support import run_with_locale + +BANNER = "-- %-10s %-6s ---------------------------------------------------\n" + +FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24." + +#---------------------------------------------------------------------------- +# Log receiver +#---------------------------------------------------------------------------- + +TIMEOUT = 10 + +from SocketServer import ThreadingTCPServer, StreamRequestHandler + +class LogRecordStreamHandler(StreamRequestHandler): + """ + Handler for a streaming logging request. It basically logs the record + using whatever logging policy is configured locally. + """ + + def handle(self): + """ + Handle multiple requests - each expected to be a 4-byte length, + followed by the LogRecord in pickle format. Logs the record + according to whatever policy is configured locally. + """ + while 1: + try: + chunk = self.connection.recv(4) + if len(chunk) < 4: + break + slen = struct.unpack(">L", chunk)[0] + chunk = self.connection.recv(slen) + while len(chunk) < slen: + chunk = chunk + self.connection.recv(slen - len(chunk)) + obj = self.unPickle(chunk) + record = logging.makeLogRecord(obj) + self.handleLogRecord(record) + except: + raise + + def unPickle(self, data): + return cPickle.loads(data) + + def handleLogRecord(self, record): + logname = "logrecv.tcp." + record.name + #If the end-of-messages sentinel is seen, tell the server to terminate + if record.msg == FINISH_UP: + self.server.abort = 1 + record.msg = record.msg + " (via " + logname + ")" + logger = logging.getLogger(logname) + logger.handle(record) + +# The server sets socketDataProcessed when it's done. +socketDataProcessed = threading.Event() + +class LogRecordSocketReceiver(ThreadingTCPServer): + """ + A simple-minded TCP socket-based logging receiver suitable for test + purposes. + """ + + allow_reuse_address = 1 + + def __init__(self, host='localhost', + port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, + handler=LogRecordStreamHandler): + ThreadingTCPServer.__init__(self, (host, port), handler) + self.abort = 0 + self.timeout = 1 + + def serve_until_stopped(self): + while not self.abort: + rd, wr, ex = select.select([self.socket.fileno()], [], [], + self.timeout) + if rd: + self.handle_request() + #notify the main thread that we're about to exit + socketDataProcessed.set() + # close the listen socket + self.server_close() + + def process_request(self, request, client_address): + #import threading + t = threading.Thread(target = self.finish_request, + args = (request, client_address)) + t.start() + +def runTCP(tcpserver): + tcpserver.serve_until_stopped() + +#---------------------------------------------------------------------------- +# Test 0 +#---------------------------------------------------------------------------- + +msgcount = 0 + +def nextmessage(): + global msgcount + rv = "Message %d" % msgcount + msgcount = msgcount + 1 + return rv + +def test0(): + ERR = logging.getLogger("ERR") + ERR.setLevel(logging.ERROR) + INF = logging.getLogger("INF") + INF.setLevel(logging.INFO) + INF_ERR = logging.getLogger("INF.ERR") + INF_ERR.setLevel(logging.ERROR) + DEB = logging.getLogger("DEB") + DEB.setLevel(logging.DEBUG) + + INF_UNDEF = logging.getLogger("INF.UNDEF") + INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") + UNDEF = logging.getLogger("UNDEF") + + GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") + CHILD = logging.getLogger("INF.BADPARENT") + + #These should log + ERR.log(logging.FATAL, nextmessage()) + ERR.error(nextmessage()) + + INF.log(logging.FATAL, nextmessage()) + INF.error(nextmessage()) + INF.warn(nextmessage()) + INF.info(nextmessage()) + + INF_UNDEF.log(logging.FATAL, nextmessage()) + INF_UNDEF.error(nextmessage()) + INF_UNDEF.warn (nextmessage()) + INF_UNDEF.info (nextmessage()) + + INF_ERR.log(logging.FATAL, nextmessage()) + INF_ERR.error(nextmessage()) + + INF_ERR_UNDEF.log(logging.FATAL, nextmessage()) + INF_ERR_UNDEF.error(nextmessage()) + + DEB.log(logging.FATAL, nextmessage()) + DEB.error(nextmessage()) + DEB.warn (nextmessage()) + DEB.info (nextmessage()) + DEB.debug(nextmessage()) + + UNDEF.log(logging.FATAL, nextmessage()) + UNDEF.error(nextmessage()) + UNDEF.warn (nextmessage()) + UNDEF.info (nextmessage()) + + GRANDCHILD.log(logging.FATAL, nextmessage()) + CHILD.log(logging.FATAL, nextmessage()) + + #These should not log + ERR.warn(nextmessage()) + ERR.info(nextmessage()) + ERR.debug(nextmessage()) + + INF.debug(nextmessage()) + INF_UNDEF.debug(nextmessage()) + + INF_ERR.warn(nextmessage()) + INF_ERR.info(nextmessage()) + INF_ERR.debug(nextmessage()) + INF_ERR_UNDEF.warn(nextmessage()) + INF_ERR_UNDEF.info(nextmessage()) + INF_ERR_UNDEF.debug(nextmessage()) + + INF.info(FINISH_UP) + +#---------------------------------------------------------------------------- +# Test 1 +#---------------------------------------------------------------------------- + +# +# First, we define our levels. There can be as many as you want - the only +# limitations are that they should be integers, the lowest should be > 0 and +# larger values mean less information being logged. If you need specific +# level values which do not fit into these limitations, you can use a +# mapping dictionary to convert between your application levels and the +# logging system. +# +SILENT = 10 +TACITURN = 9 +TERSE = 8 +EFFUSIVE = 7 +SOCIABLE = 6 +VERBOSE = 5 +TALKATIVE = 4 +GARRULOUS = 3 +CHATTERBOX = 2 +BORING = 1 + +LEVEL_RANGE = range(BORING, SILENT + 1) + +# +# Next, we define names for our levels. You don't need to do this - in which +# case the system will use "Level n" to denote the text for the level. +# +my_logging_levels = { + SILENT : 'Silent', + TACITURN : 'Taciturn', + TERSE : 'Terse', + EFFUSIVE : 'Effusive', + SOCIABLE : 'Sociable', + VERBOSE : 'Verbose', + TALKATIVE : 'Talkative', + GARRULOUS : 'Garrulous', + CHATTERBOX : 'Chatterbox', + BORING : 'Boring', +} + +# +# Now, to demonstrate filtering: suppose for some perverse reason we only +# want to print out all except GARRULOUS messages. Let's create a filter for +# this purpose... +# +class SpecificLevelFilter(logging.Filter): + def __init__(self, lvl): + self.level = lvl + + def filter(self, record): + return self.level != record.levelno + +class GarrulousFilter(SpecificLevelFilter): + def __init__(self): + SpecificLevelFilter.__init__(self, GARRULOUS) + +# +# Now, let's demonstrate filtering at the logger. This time, use a filter +# which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events +# are still excluded. +# +class VerySpecificFilter(logging.Filter): + def filter(self, record): + return record.levelno not in [SOCIABLE, TACITURN] + +def message(s): + sys.stdout.write("%s\n" % s) + +SHOULD1 = "This should only be seen at the '%s' logging level (or lower)" + +def test1(): +# +# Now, tell the logging system to associate names with our levels. +# + for lvl in my_logging_levels.keys(): + logging.addLevelName(lvl, my_logging_levels[lvl]) + +# +# Now, define a test function which logs an event at each of our levels. +# + + def doLog(log): + for lvl in LEVEL_RANGE: + log.log(lvl, SHOULD1, logging.getLevelName(lvl)) + + log = logging.getLogger("") + hdlr = log.handlers[0] +# +# Set the logging level to each different value and call the utility +# function to log events. +# In the output, you should see that each time round the loop, the number of +# logging events which are actually output decreases. +# + for lvl in LEVEL_RANGE: + message("-- setting logging level to '%s' -----" % + logging.getLevelName(lvl)) + log.setLevel(lvl) + doLog(log) + # + # Now, we demonstrate level filtering at the handler level. Tell the + # handler defined above to filter at level 'SOCIABLE', and repeat the + # above loop. Compare the output from the two runs. + # + hdlr.setLevel(SOCIABLE) + message("-- Filtering at handler level to SOCIABLE --") + for lvl in LEVEL_RANGE: + message("-- setting logging level to '%s' -----" % + logging.getLevelName(lvl)) + log.setLevel(lvl) + doLog(log) + + hdlr.setLevel(0) #turn off level filtering at the handler + + garr = GarrulousFilter() + hdlr.addFilter(garr) + message("-- Filtering using GARRULOUS filter --") + for lvl in LEVEL_RANGE: + message("-- setting logging level to '%s' -----" % + logging.getLevelName(lvl)) + log.setLevel(lvl) + doLog(log) + spec = VerySpecificFilter() + log.addFilter(spec) + message("-- Filtering using specific filter for SOCIABLE, TACITURN --") + for lvl in LEVEL_RANGE: + message("-- setting logging level to '%s' -----" % + logging.getLevelName(lvl)) + log.setLevel(lvl) + doLog(log) + + log.removeFilter(spec) + hdlr.removeFilter(garr) + #Undo the one level which clashes...for regression tests + logging.addLevelName(logging.DEBUG, "DEBUG") + +#---------------------------------------------------------------------------- +# Test 2 +#---------------------------------------------------------------------------- + +MSG = "-- logging %d at INFO, messages should be seen every 10 events --" +def test2(): + logger = logging.getLogger("") + sh = logger.handlers[0] + sh.close() + logger.removeHandler(sh) + mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh) + logger.setLevel(logging.DEBUG) + logger.addHandler(mh) + message("-- logging at DEBUG, nothing should be seen yet --") + logger.debug("Debug message") + message("-- logging at INFO, nothing should be seen yet --") + logger.info("Info message") + message("-- logging at WARNING, 3 messages should be seen --") + logger.warn("Warn message") + for i in xrange(102): + message(MSG % i) + logger.info("Info index = %d", i) + mh.close() + logger.removeHandler(mh) + logger.addHandler(sh) + +#---------------------------------------------------------------------------- +# Test 3 +#---------------------------------------------------------------------------- + +FILTER = "a.b" + +def doLog3(): + logging.getLogger("a").info("Info 1") + logging.getLogger("a.b").info("Info 2") + logging.getLogger("a.c").info("Info 3") + logging.getLogger("a.b.c").info("Info 4") + logging.getLogger("a.b.c.d").info("Info 5") + logging.getLogger("a.bb.c").info("Info 6") + logging.getLogger("b").info("Info 7") + logging.getLogger("b.a").info("Info 8") + logging.getLogger("c.a.b").info("Info 9") + logging.getLogger("a.bb").info("Info 10") + +def test3(): + root = logging.getLogger() + root.setLevel(logging.DEBUG) + hand = root.handlers[0] + message("Unfiltered...") + doLog3() + message("Filtered with '%s'..." % FILTER) + filt = logging.Filter(FILTER) + hand.addFilter(filt) + doLog3() + hand.removeFilter(filt) + +#---------------------------------------------------------------------------- +# Test 4 +#---------------------------------------------------------------------------- + +# config0 is a standard configuration. +config0 = """ +[loggers] +keys=root + +[handlers] +keys=hand1 + +[formatters] +keys=form1 + +[logger_root] +level=NOTSET +handlers=hand1 + +[handler_hand1] +class=StreamHandler +level=NOTSET +formatter=form1 +args=(sys.stdout,) + +[formatter_form1] +format=%(levelname)s:%(name)s:%(message)s +datefmt= +""" + +# config1 adds a little to the standard configuration. +config1 = """ +[loggers] +keys=root,parser + +[handlers] +keys=hand1 + +[formatters] +keys=form1 + +[logger_root] +level=NOTSET +handlers=hand1 + +[logger_parser] +level=DEBUG +handlers=hand1 +propagate=1 +qualname=compiler.parser + +[handler_hand1] +class=StreamHandler +level=NOTSET +formatter=form1 +args=(sys.stdout,) + +[formatter_form1] +format=%(levelname)s:%(name)s:%(message)s +datefmt= +""" + +# config2 has a subtle configuration error that should be reported +config2 = string.replace(config1, "sys.stdout", "sys.stbout") + +# config3 has a less subtle configuration error +config3 = string.replace( + config1, "formatter=form1", "formatter=misspelled_name") + +def test4(): + for i in range(4): + conf = globals()['config%d' % i] + sys.stdout.write('config%d: ' % i) + loggerDict = logging.getLogger().manager.loggerDict + logging._acquireLock() + try: + saved_handlers = logging._handlers.copy() + saved_handler_list = logging._handlerList[:] + saved_loggers = loggerDict.copy() + finally: + logging._releaseLock() + try: + fn = tempfile.mktemp(".ini") + f = open(fn, "w") + f.write(conf) + f.close() + try: + logging.config.fileConfig(fn) + #call again to make sure cleanup is correct + logging.config.fileConfig(fn) + except: + t = sys.exc_info()[0] + message(str(t)) + else: + message('ok.') + os.remove(fn) + finally: + logging._acquireLock() + try: + logging._handlers.clear() + logging._handlers.update(saved_handlers) + logging._handlerList[:] = saved_handler_list + loggerDict = logging.getLogger().manager.loggerDict + loggerDict.clear() + loggerDict.update(saved_loggers) + finally: + logging._releaseLock() + +#---------------------------------------------------------------------------- +# Test 5 +#---------------------------------------------------------------------------- + +test5_config = """ +[loggers] +keys=root + +[handlers] +keys=hand1 + +[formatters] +keys=form1 + +[logger_root] +level=NOTSET +handlers=hand1 + +[handler_hand1] +class=StreamHandler +level=NOTSET +formatter=form1 +args=(sys.stdout,) + +[formatter_form1] +class=test.test_logging.FriendlyFormatter +format=%(levelname)s:%(name)s:%(message)s +datefmt= +""" + +class FriendlyFormatter (logging.Formatter): + def formatException(self, ei): + return "%s... Don't panic!" % str(ei[0]) + + +def test5(): + loggerDict = logging.getLogger().manager.loggerDict + logging._acquireLock() + try: + saved_handlers = logging._handlers.copy() + saved_handler_list = logging._handlerList[:] + saved_loggers = loggerDict.copy() + finally: + logging._releaseLock() + try: + fn = tempfile.mktemp(".ini") + f = open(fn, "w") + f.write(test5_config) + f.close() + logging.config.fileConfig(fn) + try: + raise KeyError + except KeyError: + logging.exception("just testing") + os.remove(fn) + hdlr = logging.getLogger().handlers[0] + logging.getLogger().handlers.remove(hdlr) + finally: + logging._acquireLock() + try: + logging._handlers.clear() + logging._handlers.update(saved_handlers) + logging._handlerList[:] = saved_handler_list + loggerDict = logging.getLogger().manager.loggerDict + loggerDict.clear() + loggerDict.update(saved_loggers) + finally: + logging._releaseLock() + + +#---------------------------------------------------------------------------- +# Test Harness +#---------------------------------------------------------------------------- +def banner(nm, typ): + sep = BANNER % (nm, typ) + sys.stdout.write(sep) + sys.stdout.flush() + +def test_main_inner(): + rootLogger = logging.getLogger("") + rootLogger.setLevel(logging.DEBUG) + hdlr = logging.StreamHandler(sys.stdout) + fmt = logging.Formatter(logging.BASIC_FORMAT) + hdlr.setFormatter(fmt) + rootLogger.addHandler(hdlr) + + # Find an unused port number + port = logging.handlers.DEFAULT_TCP_LOGGING_PORT + while port < logging.handlers.DEFAULT_TCP_LOGGING_PORT+100: + try: + tcpserver = LogRecordSocketReceiver(port=port) + except socket.error: + port += 1 + else: + break + else: + raise ImportError, "Could not find unused port" + + + #Set up a handler such that all events are sent via a socket to the log + #receiver (logrecv). + #The handler will only be added to the rootLogger for some of the tests + shdlr = logging.handlers.SocketHandler('localhost', port) + + #Configure the logger for logrecv so events do not propagate beyond it. + #The sockLogger output is buffered in memory until the end of the test, + #and printed at the end. + sockOut = cStringIO.StringIO() + sockLogger = logging.getLogger("logrecv") + sockLogger.setLevel(logging.DEBUG) + sockhdlr = logging.StreamHandler(sockOut) + sockhdlr.setFormatter(logging.Formatter( + "%(name)s -> %(levelname)s: %(message)s")) + sockLogger.addHandler(sockhdlr) + sockLogger.propagate = 0 + + #Set up servers + threads = [] + #sys.stdout.write("About to start TCP server...\n") + threads.append(threading.Thread(target=runTCP, args=(tcpserver,))) + + for thread in threads: + thread.start() + try: + banner("log_test0", "begin") + + rootLogger.addHandler(shdlr) + test0() + # XXX(nnorwitz): Try to fix timing related test failures. + # This sleep gives us some extra time to read messages. + # The test generally only fails on Solaris without this sleep. + time.sleep(2.0) + shdlr.close() + rootLogger.removeHandler(shdlr) + + banner("log_test0", "end") + + for t in range(1,6): + banner("log_test%d" % t, "begin") + globals()['test%d' % t]() + banner("log_test%d" % t, "end") + + finally: + #wait for TCP receiver to terminate + socketDataProcessed.wait() + # ensure the server dies + tcpserver.abort = 1 + for thread in threads: + thread.join(2.0) + banner("logrecv output", "begin") + sys.stdout.write(sockOut.getvalue()) + sockOut.close() + sockLogger.removeHandler(sockhdlr) + sockhdlr.close() + banner("logrecv output", "end") + sys.stdout.flush() + try: + hdlr.close() + except: + pass + rootLogger.removeHandler(hdlr) + +# Set the locale to the platform-dependent default. I have no idea +# why the test does this, but in any case we save the current locale +# first and restore it at the end. +@run_with_locale('LC_ALL', '') +def test_main(): + # Save and restore the original root logger level across the tests. + # Otherwise, e.g., if any test using cookielib runs after test_logging, + # cookielib's debug-level logger tries to log messages, leading to + # confusing: + # No handlers could be found for logger "cookielib" + # output while the tests are running. + root_logger = logging.getLogger("") + original_logging_level = root_logger.getEffectiveLevel() + try: + test_main_inner() + finally: + root_logger.setLevel(original_logging_level) + +if __name__ == "__main__": + sys.stdout.write("test_logging\n") + test_main()