python-2.5.2/win32/Lib/test/test_logging.py
changeset 0 ae805ac0140d
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/python-2.5.2/win32/Lib/test/test_logging.py	Fri Apr 03 17:19:34 2009 +0100
@@ -0,0 +1,685 @@
+#!/usr/bin/env python
+#
+# Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose and without fee is hereby granted,
+# provided that the above copyright notice appear in all copies and that
+# both that copyright notice and this permission notice appear in
+# supporting documentation, and that the name of Vinay Sajip
+# not be used in advertising or publicity pertaining to distribution
+# of the software without specific, written prior permission.
+# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
+# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
+# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
+# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+#
+# This file is part of the Python logging distribution. See
+# http://www.red-dove.com/python_logging.html
+#
+"""Test harness for the logging module. Run all tests.
+
+Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
+"""
+
+import select
+import os, sys, string, struct, types, cPickle, cStringIO
+import socket, tempfile, threading, time
+import logging, logging.handlers, logging.config
+from test.test_support import run_with_locale
+
+BANNER = "-- %-10s %-6s ---------------------------------------------------\n"
+
+FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24."
+
+#----------------------------------------------------------------------------
+# Log receiver
+#----------------------------------------------------------------------------
+
+TIMEOUT         = 10
+
+from SocketServer import ThreadingTCPServer, StreamRequestHandler
+
+class LogRecordStreamHandler(StreamRequestHandler):
+    """
+    Handler for a streaming logging request. It basically logs the record
+    using whatever logging policy is configured locally.
+    """
+
+    def handle(self):
+        """
+        Handle multiple requests - each expected to be a 4-byte length,
+        followed by the LogRecord in pickle format. Logs the record
+        according to whatever policy is configured locally.
+        """
+        while 1:
+            try:
+                chunk = self.connection.recv(4)
+                if len(chunk) < 4:
+                    break
+                slen = struct.unpack(">L", chunk)[0]
+                chunk = self.connection.recv(slen)
+                while len(chunk) < slen:
+                    chunk = chunk + self.connection.recv(slen - len(chunk))
+                obj = self.unPickle(chunk)
+                record = logging.makeLogRecord(obj)
+                self.handleLogRecord(record)
+            except:
+                raise
+
+    def unPickle(self, data):
+        return cPickle.loads(data)
+
+    def handleLogRecord(self, record):
+        logname = "logrecv.tcp." + record.name
+        #If the end-of-messages sentinel is seen, tell the server to terminate
+        if record.msg == FINISH_UP:
+            self.server.abort = 1
+        record.msg = record.msg + " (via " + logname + ")"
+        logger = logging.getLogger(logname)
+        logger.handle(record)
+
+# The server sets socketDataProcessed when it's done.
+socketDataProcessed = threading.Event()
+
+class LogRecordSocketReceiver(ThreadingTCPServer):
+    """
+    A simple-minded TCP socket-based logging receiver suitable for test
+    purposes.
+    """
+
+    allow_reuse_address = 1
+
+    def __init__(self, host='localhost',
+                             port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
+                     handler=LogRecordStreamHandler):
+        ThreadingTCPServer.__init__(self, (host, port), handler)
+        self.abort = 0
+        self.timeout = 1
+
+    def serve_until_stopped(self):
+        while not self.abort:
+            rd, wr, ex = select.select([self.socket.fileno()], [], [],
+                                       self.timeout)
+            if rd:
+                self.handle_request()
+        #notify the main thread that we're about to exit
+        socketDataProcessed.set()
+        # close the listen socket
+        self.server_close()
+
+    def process_request(self, request, client_address):
+        #import threading
+        t = threading.Thread(target = self.finish_request,
+                             args = (request, client_address))
+        t.start()
+
+def runTCP(tcpserver):
+    tcpserver.serve_until_stopped()
+
+#----------------------------------------------------------------------------
+# Test 0
+#----------------------------------------------------------------------------
+
+msgcount = 0
+
+def nextmessage():
+    global msgcount
+    rv = "Message %d" % msgcount
+    msgcount = msgcount + 1
+    return rv
+
+def test0():
+    ERR = logging.getLogger("ERR")
+    ERR.setLevel(logging.ERROR)
+    INF = logging.getLogger("INF")
+    INF.setLevel(logging.INFO)
+    INF_ERR  = logging.getLogger("INF.ERR")
+    INF_ERR.setLevel(logging.ERROR)
+    DEB = logging.getLogger("DEB")
+    DEB.setLevel(logging.DEBUG)
+
+    INF_UNDEF = logging.getLogger("INF.UNDEF")
+    INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
+    UNDEF = logging.getLogger("UNDEF")
+
+    GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
+    CHILD = logging.getLogger("INF.BADPARENT")
+
+    #These should log
+    ERR.log(logging.FATAL, nextmessage())
+    ERR.error(nextmessage())
+
+    INF.log(logging.FATAL, nextmessage())
+    INF.error(nextmessage())
+    INF.warn(nextmessage())
+    INF.info(nextmessage())
+
+    INF_UNDEF.log(logging.FATAL, nextmessage())
+    INF_UNDEF.error(nextmessage())
+    INF_UNDEF.warn (nextmessage())
+    INF_UNDEF.info (nextmessage())
+
+    INF_ERR.log(logging.FATAL, nextmessage())
+    INF_ERR.error(nextmessage())
+
+    INF_ERR_UNDEF.log(logging.FATAL, nextmessage())
+    INF_ERR_UNDEF.error(nextmessage())
+
+    DEB.log(logging.FATAL, nextmessage())
+    DEB.error(nextmessage())
+    DEB.warn (nextmessage())
+    DEB.info (nextmessage())
+    DEB.debug(nextmessage())
+
+    UNDEF.log(logging.FATAL, nextmessage())
+    UNDEF.error(nextmessage())
+    UNDEF.warn (nextmessage())
+    UNDEF.info (nextmessage())
+
+    GRANDCHILD.log(logging.FATAL, nextmessage())
+    CHILD.log(logging.FATAL, nextmessage())
+
+    #These should not log
+    ERR.warn(nextmessage())
+    ERR.info(nextmessage())
+    ERR.debug(nextmessage())
+
+    INF.debug(nextmessage())
+    INF_UNDEF.debug(nextmessage())
+
+    INF_ERR.warn(nextmessage())
+    INF_ERR.info(nextmessage())
+    INF_ERR.debug(nextmessage())
+    INF_ERR_UNDEF.warn(nextmessage())
+    INF_ERR_UNDEF.info(nextmessage())
+    INF_ERR_UNDEF.debug(nextmessage())
+
+    INF.info(FINISH_UP)
+
+#----------------------------------------------------------------------------
+# Test 1
+#----------------------------------------------------------------------------
+
+#
+#   First, we define our levels. There can be as many as you want - the only
+#     limitations are that they should be integers, the lowest should be > 0 and
+#   larger values mean less information being logged. If you need specific
+#   level values which do not fit into these limitations, you can use a
+#   mapping dictionary to convert between your application levels and the
+#   logging system.
+#
+SILENT      = 10
+TACITURN    = 9
+TERSE       = 8
+EFFUSIVE    = 7
+SOCIABLE    = 6
+VERBOSE     = 5
+TALKATIVE   = 4
+GARRULOUS   = 3
+CHATTERBOX  = 2
+BORING      = 1
+
+LEVEL_RANGE = range(BORING, SILENT + 1)
+
+#
+#   Next, we define names for our levels. You don't need to do this - in which
+#   case the system will use "Level n" to denote the text for the level.
+#
+my_logging_levels = {
+    SILENT      : 'Silent',
+    TACITURN    : 'Taciturn',
+    TERSE       : 'Terse',
+    EFFUSIVE    : 'Effusive',
+    SOCIABLE    : 'Sociable',
+    VERBOSE     : 'Verbose',
+    TALKATIVE   : 'Talkative',
+    GARRULOUS   : 'Garrulous',
+    CHATTERBOX  : 'Chatterbox',
+    BORING      : 'Boring',
+}
+
+#
+#   Now, to demonstrate filtering: suppose for some perverse reason we only
+#   want to print out all except GARRULOUS messages. Let's create a filter for
+#   this purpose...
+#
+class SpecificLevelFilter(logging.Filter):
+    def __init__(self, lvl):
+        self.level = lvl
+
+    def filter(self, record):
+        return self.level != record.levelno
+
+class GarrulousFilter(SpecificLevelFilter):
+    def __init__(self):
+        SpecificLevelFilter.__init__(self, GARRULOUS)
+
+#
+#   Now, let's demonstrate filtering at the logger. This time, use a filter
+#   which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events
+#   are still excluded.
+#
+class VerySpecificFilter(logging.Filter):
+    def filter(self, record):
+        return record.levelno not in [SOCIABLE, TACITURN]
+
+def message(s):
+    sys.stdout.write("%s\n" % s)
+
+SHOULD1 = "This should only be seen at the '%s' logging level (or lower)"
+
+def test1():
+#
+#   Now, tell the logging system to associate names with our levels.
+#
+    for lvl in my_logging_levels.keys():
+        logging.addLevelName(lvl, my_logging_levels[lvl])
+
+#
+#   Now, define a test function which logs an event at each of our levels.
+#
+
+    def doLog(log):
+        for lvl in LEVEL_RANGE:
+            log.log(lvl, SHOULD1, logging.getLevelName(lvl))
+
+    log = logging.getLogger("")
+    hdlr = log.handlers[0]
+#
+#   Set the logging level to each different value and call the utility
+#   function to log events.
+#   In the output, you should see that each time round the loop, the number of
+#   logging events which are actually output decreases.
+#
+    for lvl in LEVEL_RANGE:
+        message("-- setting logging level to '%s' -----" %
+                        logging.getLevelName(lvl))
+        log.setLevel(lvl)
+        doLog(log)
+  #
+  #   Now, we demonstrate level filtering at the handler level. Tell the
+  #   handler defined above to filter at level 'SOCIABLE', and repeat the
+  #   above loop. Compare the output from the two runs.
+  #
+    hdlr.setLevel(SOCIABLE)
+    message("-- Filtering at handler level to SOCIABLE --")
+    for lvl in LEVEL_RANGE:
+        message("-- setting logging level to '%s' -----" %
+                      logging.getLevelName(lvl))
+        log.setLevel(lvl)
+        doLog(log)
+
+    hdlr.setLevel(0)    #turn off level filtering at the handler
+
+    garr = GarrulousFilter()
+    hdlr.addFilter(garr)
+    message("-- Filtering using GARRULOUS filter --")
+    for lvl in LEVEL_RANGE:
+        message("-- setting logging level to '%s' -----" %
+                        logging.getLevelName(lvl))
+        log.setLevel(lvl)
+        doLog(log)
+    spec = VerySpecificFilter()
+    log.addFilter(spec)
+    message("-- Filtering using specific filter for SOCIABLE, TACITURN --")
+    for lvl in LEVEL_RANGE:
+        message("-- setting logging level to '%s' -----" %
+                      logging.getLevelName(lvl))
+        log.setLevel(lvl)
+        doLog(log)
+
+    log.removeFilter(spec)
+    hdlr.removeFilter(garr)
+    #Undo the one level which clashes...for regression tests
+    logging.addLevelName(logging.DEBUG, "DEBUG")
+
+#----------------------------------------------------------------------------
+# Test 2
+#----------------------------------------------------------------------------
+
+MSG = "-- logging %d at INFO, messages should be seen every 10 events --"
+def test2():
+    logger = logging.getLogger("")
+    sh = logger.handlers[0]
+    sh.close()
+    logger.removeHandler(sh)
+    mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh)
+    logger.setLevel(logging.DEBUG)
+    logger.addHandler(mh)
+    message("-- logging at DEBUG, nothing should be seen yet --")
+    logger.debug("Debug message")
+    message("-- logging at INFO, nothing should be seen yet --")
+    logger.info("Info message")
+    message("-- logging at WARNING, 3 messages should be seen --")
+    logger.warn("Warn message")
+    for i in xrange(102):
+        message(MSG % i)
+        logger.info("Info index = %d", i)
+    mh.close()
+    logger.removeHandler(mh)
+    logger.addHandler(sh)
+
+#----------------------------------------------------------------------------
+# Test 3
+#----------------------------------------------------------------------------
+
+FILTER = "a.b"
+
+def doLog3():
+    logging.getLogger("a").info("Info 1")
+    logging.getLogger("a.b").info("Info 2")
+    logging.getLogger("a.c").info("Info 3")
+    logging.getLogger("a.b.c").info("Info 4")
+    logging.getLogger("a.b.c.d").info("Info 5")
+    logging.getLogger("a.bb.c").info("Info 6")
+    logging.getLogger("b").info("Info 7")
+    logging.getLogger("b.a").info("Info 8")
+    logging.getLogger("c.a.b").info("Info 9")
+    logging.getLogger("a.bb").info("Info 10")
+
+def test3():
+    root = logging.getLogger()
+    root.setLevel(logging.DEBUG)
+    hand = root.handlers[0]
+    message("Unfiltered...")
+    doLog3()
+    message("Filtered with '%s'..." % FILTER)
+    filt = logging.Filter(FILTER)
+    hand.addFilter(filt)
+    doLog3()
+    hand.removeFilter(filt)
+
+#----------------------------------------------------------------------------
+# Test 4
+#----------------------------------------------------------------------------
+
+# config0 is a standard configuration.
+config0 = """
+[loggers]
+keys=root
+
+[handlers]
+keys=hand1
+
+[formatters]
+keys=form1
+
+[logger_root]
+level=NOTSET
+handlers=hand1
+
+[handler_hand1]
+class=StreamHandler
+level=NOTSET
+formatter=form1
+args=(sys.stdout,)
+
+[formatter_form1]
+format=%(levelname)s:%(name)s:%(message)s
+datefmt=
+"""
+
+# config1 adds a little to the standard configuration.
+config1 = """
+[loggers]
+keys=root,parser
+
+[handlers]
+keys=hand1
+
+[formatters]
+keys=form1
+
+[logger_root]
+level=NOTSET
+handlers=hand1
+
+[logger_parser]
+level=DEBUG
+handlers=hand1
+propagate=1
+qualname=compiler.parser
+
+[handler_hand1]
+class=StreamHandler
+level=NOTSET
+formatter=form1
+args=(sys.stdout,)
+
+[formatter_form1]
+format=%(levelname)s:%(name)s:%(message)s
+datefmt=
+"""
+
+# config2 has a subtle configuration error that should be reported
+config2 = string.replace(config1, "sys.stdout", "sys.stbout")
+
+# config3 has a less subtle configuration error
+config3 = string.replace(
+    config1, "formatter=form1", "formatter=misspelled_name")
+
+def test4():
+    for i in range(4):
+        conf = globals()['config%d' % i]
+        sys.stdout.write('config%d: ' % i)
+        loggerDict = logging.getLogger().manager.loggerDict
+        logging._acquireLock()
+        try:
+            saved_handlers = logging._handlers.copy()
+            saved_handler_list = logging._handlerList[:]
+            saved_loggers = loggerDict.copy()
+        finally:
+            logging._releaseLock()
+        try:
+            fn = tempfile.mktemp(".ini")
+            f = open(fn, "w")
+            f.write(conf)
+            f.close()
+            try:
+                logging.config.fileConfig(fn)
+                #call again to make sure cleanup is correct
+                logging.config.fileConfig(fn)
+            except:
+                t = sys.exc_info()[0]
+                message(str(t))
+            else:
+                message('ok.')
+            os.remove(fn)
+        finally:
+            logging._acquireLock()
+            try:
+                logging._handlers.clear()
+                logging._handlers.update(saved_handlers)
+                logging._handlerList[:] = saved_handler_list
+                loggerDict = logging.getLogger().manager.loggerDict
+                loggerDict.clear()
+                loggerDict.update(saved_loggers)
+            finally:
+                logging._releaseLock()
+
+#----------------------------------------------------------------------------
+# Test 5
+#----------------------------------------------------------------------------
+
+test5_config = """
+[loggers]
+keys=root
+
+[handlers]
+keys=hand1
+
+[formatters]
+keys=form1
+
+[logger_root]
+level=NOTSET
+handlers=hand1
+
+[handler_hand1]
+class=StreamHandler
+level=NOTSET
+formatter=form1
+args=(sys.stdout,)
+
+[formatter_form1]
+class=test.test_logging.FriendlyFormatter
+format=%(levelname)s:%(name)s:%(message)s
+datefmt=
+"""
+
+class FriendlyFormatter (logging.Formatter):
+    def formatException(self, ei):
+        return "%s... Don't panic!" % str(ei[0])
+
+
+def test5():
+    loggerDict = logging.getLogger().manager.loggerDict
+    logging._acquireLock()
+    try:
+        saved_handlers = logging._handlers.copy()
+        saved_handler_list = logging._handlerList[:]
+        saved_loggers = loggerDict.copy()
+    finally:
+        logging._releaseLock()
+    try:
+        fn = tempfile.mktemp(".ini")
+        f = open(fn, "w")
+        f.write(test5_config)
+        f.close()
+        logging.config.fileConfig(fn)
+        try:
+            raise KeyError
+        except KeyError:
+            logging.exception("just testing")
+        os.remove(fn)
+        hdlr = logging.getLogger().handlers[0]
+        logging.getLogger().handlers.remove(hdlr)
+    finally:
+        logging._acquireLock()
+        try:
+            logging._handlers.clear()
+            logging._handlers.update(saved_handlers)
+            logging._handlerList[:] = saved_handler_list
+            loggerDict = logging.getLogger().manager.loggerDict
+            loggerDict.clear()
+            loggerDict.update(saved_loggers)
+        finally:
+            logging._releaseLock()
+
+
+#----------------------------------------------------------------------------
+# Test Harness
+#----------------------------------------------------------------------------
+def banner(nm, typ):
+    sep = BANNER % (nm, typ)
+    sys.stdout.write(sep)
+    sys.stdout.flush()
+
+def test_main_inner():
+    rootLogger = logging.getLogger("")
+    rootLogger.setLevel(logging.DEBUG)
+    hdlr = logging.StreamHandler(sys.stdout)
+    fmt = logging.Formatter(logging.BASIC_FORMAT)
+    hdlr.setFormatter(fmt)
+    rootLogger.addHandler(hdlr)
+
+    # Find an unused port number
+    port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
+    while port < logging.handlers.DEFAULT_TCP_LOGGING_PORT+100:
+        try:
+            tcpserver = LogRecordSocketReceiver(port=port)
+        except socket.error:
+            port += 1
+        else:
+            break
+    else:
+        raise ImportError, "Could not find unused port"
+
+
+    #Set up a handler such that all events are sent via a socket to the log
+    #receiver (logrecv).
+    #The handler will only be added to the rootLogger for some of the tests
+    shdlr = logging.handlers.SocketHandler('localhost', port)
+
+    #Configure the logger for logrecv so events do not propagate beyond it.
+    #The sockLogger output is buffered in memory until the end of the test,
+    #and printed at the end.
+    sockOut = cStringIO.StringIO()
+    sockLogger = logging.getLogger("logrecv")
+    sockLogger.setLevel(logging.DEBUG)
+    sockhdlr = logging.StreamHandler(sockOut)
+    sockhdlr.setFormatter(logging.Formatter(
+                                   "%(name)s -> %(levelname)s: %(message)s"))
+    sockLogger.addHandler(sockhdlr)
+    sockLogger.propagate = 0
+
+    #Set up servers
+    threads = []
+    #sys.stdout.write("About to start TCP server...\n")
+    threads.append(threading.Thread(target=runTCP, args=(tcpserver,)))
+
+    for thread in threads:
+        thread.start()
+    try:
+        banner("log_test0", "begin")
+
+        rootLogger.addHandler(shdlr)
+        test0()
+        # XXX(nnorwitz): Try to fix timing related test failures.
+        # This sleep gives us some extra time to read messages.
+        # The test generally only fails on Solaris without this sleep.
+        time.sleep(2.0)
+        shdlr.close()
+        rootLogger.removeHandler(shdlr)
+
+        banner("log_test0", "end")
+
+        for t in range(1,6):
+            banner("log_test%d" % t, "begin")
+            globals()['test%d' % t]()
+            banner("log_test%d" % t, "end")
+
+    finally:
+        #wait for TCP receiver to terminate
+        socketDataProcessed.wait()
+        # ensure the server dies
+        tcpserver.abort = 1
+        for thread in threads:
+            thread.join(2.0)
+        banner("logrecv output", "begin")
+        sys.stdout.write(sockOut.getvalue())
+        sockOut.close()
+        sockLogger.removeHandler(sockhdlr)
+        sockhdlr.close()
+        banner("logrecv output", "end")
+        sys.stdout.flush()
+        try:
+            hdlr.close()
+        except:
+            pass
+        rootLogger.removeHandler(hdlr)
+
+# Set the locale to the platform-dependent default.  I have no idea
+# why the test does this, but in any case we save the current locale
+# first and restore it at the end.
+@run_with_locale('LC_ALL', '')
+def test_main():
+    # Save and restore the original root logger level across the tests.
+    # Otherwise, e.g., if any test using cookielib runs after test_logging,
+    # cookielib's debug-level logger tries to log messages, leading to
+    # confusing:
+    #    No handlers could be found for logger "cookielib"
+    # output while the tests are running.
+    root_logger = logging.getLogger("")
+    original_logging_level = root_logger.getEffectiveLevel()
+    try:
+        test_main_inner()
+    finally:
+        root_logger.setLevel(original_logging_level)
+
+if __name__ == "__main__":
+    sys.stdout.write("test_logging\n")
+    test_main()