symbian-qemu-0.9.1-12/python-2.6.1/Lib/bsddb/test/test_thread.py
author Gareth Stockwell <gareth.stockwell@accenture.com>
Wed, 22 Sep 2010 15:40:40 +0100
branchgraphics-phase-3
changeset 111 345f1c88c950
parent 1 2fb8b9db1c86
permissions -rw-r--r--
Fixes to syborg-graphicswrapper.vcproj These changes allow syborg-graphicswrapper to link against the hostthreadadapter and khronosapiwrapper libraries built by the graphics.simulator component. The .vcproj file uses relative paths, which requires that the following three packages are laid out as follows: os/ graphics adapt/ graphics.simulator qemu

"""TestCases for multi-threaded access to a DB.
"""

import os
import sys
import time
import errno
from random import random

DASH = '-'

try:
    WindowsError
except NameError:
    class WindowsError(Exception):
        pass

import unittest
from test_all import db, dbutils, test_support, verbose, have_threads, \
        get_new_environment_path, get_new_database_path

if have_threads :
    from threading import Thread
    import sys
    if sys.version_info[0] < 3 :
        from threading import currentThread
    else :
        from threading import current_thread as currentThread


#----------------------------------------------------------------------

class BaseThreadedTestCase(unittest.TestCase):
    dbtype       = db.DB_UNKNOWN  # must be set in derived class
    dbopenflags  = 0
    dbsetflags   = 0
    envflags     = 0

    import sys
    if sys.version_info[:3] < (2, 4, 0):
        def assertTrue(self, expr, msg=None):
            self.failUnless(expr,msg=msg)

    def setUp(self):
        if verbose:
            dbutils._deadlock_VerboseFile = sys.stdout

        self.homeDir = get_new_environment_path()
        self.env = db.DBEnv()
        self.setEnvOpts()
        self.env.open(self.homeDir, self.envflags | db.DB_CREATE)

        self.filename = self.__class__.__name__ + '.db'
        self.d = db.DB(self.env)
        if self.dbsetflags:
            self.d.set_flags(self.dbsetflags)
        self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)

    def tearDown(self):
        self.d.close()
        self.env.close()
        test_support.rmtree(self.homeDir)

    def setEnvOpts(self):
        pass

    def makeData(self, key):
        return DASH.join([key] * 5)


#----------------------------------------------------------------------


class ConcurrentDataStoreBase(BaseThreadedTestCase):
    dbopenflags = db.DB_THREAD
    envflags    = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
    readers     = 0 # derived class should set
    writers     = 0
    records     = 1000

    def test01_1WriterMultiReaders(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_1WriterMultiReaders..." % \
                  self.__class__.__name__

        keys=range(self.records)
        import random
        random.shuffle(keys)
        records_per_writer=self.records//self.writers
        readers_per_writer=self.readers//self.writers
        self.assertEqual(self.records,self.writers*records_per_writer)
        self.assertEqual(self.readers,self.writers*readers_per_writer)
        self.assertTrue((records_per_writer%readers_per_writer)==0)
        readers = []

        for x in xrange(self.readers):
            rt = Thread(target = self.readerThread,
                        args = (self.d, x),
                        name = 'reader %d' % x,
                        )#verbose = verbose)
            import sys
            if sys.version_info[0] < 3 :
                rt.setDaemon(True)
            else :
                rt.daemon = True
            readers.append(rt)

        writers=[]
        for x in xrange(self.writers):
            a=keys[records_per_writer*x:records_per_writer*(x+1)]
            a.sort()  # Generate conflicts
            b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
            wt = Thread(target = self.writerThread,
                        args = (self.d, a, b),
                        name = 'writer %d' % x,
                        )#verbose = verbose)
            writers.append(wt)

        for t in writers:
            import sys
            if sys.version_info[0] < 3 :
                t.setDaemon(True)
            else :
                t.daemon = True
            t.start()

        for t in writers:
            t.join()
        for t in readers:
            t.join()

    def writerThread(self, d, keys, readers):
        import sys
        if sys.version_info[0] < 3 :
            name = currentThread().getName()
        else :
            name = currentThread().name

        if verbose:
            print "%s: creating records %d - %d" % (name, start, stop)

        count=len(keys)//len(readers)
        count2=count
        for x in keys :
            key = '%04d' % x
            dbutils.DeadlockWrap(d.put, key, self.makeData(key),
                                 max_retries=12)
            if verbose and x % 100 == 0:
                print "%s: records %d - %d finished" % (name, start, x)

            count2-=1
            if not count2 :
                readers.pop().start()
                count2=count

        if verbose:
            print "%s: finished creating records" % name

        if verbose:
            print "%s: thread finished" % name

    def readerThread(self, d, readerNum):
        import sys
        if sys.version_info[0] < 3 :
            name = currentThread().getName()
        else :
            name = currentThread().name

        for i in xrange(5) :
            c = d.cursor()
            count = 0
            rec = c.first()
            while rec:
                count += 1
                key, data = rec
                self.assertEqual(self.makeData(key), data)
                rec = c.next()
            if verbose:
                print "%s: found %d records" % (name, count)
            c.close()

        if verbose:
            print "%s: thread finished" % name


class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
    dbtype  = db.DB_BTREE
    writers = 2
    readers = 10
    records = 1000


class HashConcurrentDataStore(ConcurrentDataStoreBase):
    dbtype  = db.DB_HASH
    writers = 2
    readers = 10
    records = 1000


#----------------------------------------------------------------------

class SimpleThreadedBase(BaseThreadedTestCase):
    dbopenflags = db.DB_THREAD
    envflags    = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
    readers = 10
    writers = 2
    records = 1000

    def setEnvOpts(self):
        self.env.set_lk_detect(db.DB_LOCK_DEFAULT)

    def test02_SimpleLocks(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_SimpleLocks..." % self.__class__.__name__


        keys=range(self.records)
        import random
        random.shuffle(keys)
        records_per_writer=self.records//self.writers
        readers_per_writer=self.readers//self.writers
        self.assertEqual(self.records,self.writers*records_per_writer)
        self.assertEqual(self.readers,self.writers*readers_per_writer)
        self.assertTrue((records_per_writer%readers_per_writer)==0)

        readers = []
        for x in xrange(self.readers):
            rt = Thread(target = self.readerThread,
                        args = (self.d, x),
                        name = 'reader %d' % x,
                        )#verbose = verbose)
            import sys
            if sys.version_info[0] < 3 :
                rt.setDaemon(True)
            else :
                rt.daemon = True
            readers.append(rt)

        writers = []
        for x in xrange(self.writers):
            a=keys[records_per_writer*x:records_per_writer*(x+1)]
            a.sort()  # Generate conflicts
            b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
            wt = Thread(target = self.writerThread,
                        args = (self.d, a, b),
                        name = 'writer %d' % x,
                        )#verbose = verbose)
            writers.append(wt)

        for t in writers:
            import sys
            if sys.version_info[0] < 3 :
                t.setDaemon(True)
            else :
                t.daemon = True
            t.start()

        for t in writers:
            t.join()
        for t in readers:
            t.join()

    def writerThread(self, d, keys, readers):
        import sys
        if sys.version_info[0] < 3 :
            name = currentThread().getName()
        else :
            name = currentThread().name
        if verbose:
            print "%s: creating records %d - %d" % (name, start, stop)

        count=len(keys)//len(readers)
        count2=count
        for x in keys :
            key = '%04d' % x
            dbutils.DeadlockWrap(d.put, key, self.makeData(key),
                                 max_retries=12)

            if verbose and x % 100 == 0:
                print "%s: records %d - %d finished" % (name, start, x)

            count2-=1
            if not count2 :
                readers.pop().start()
                count2=count

        if verbose:
            print "%s: thread finished" % name

    def readerThread(self, d, readerNum):
        import sys
        if sys.version_info[0] < 3 :
            name = currentThread().getName()
        else :
            name = currentThread().name

        c = d.cursor()
        count = 0
        rec = dbutils.DeadlockWrap(c.first, max_retries=10)
        while rec:
            count += 1
            key, data = rec
            self.assertEqual(self.makeData(key), data)
            rec = dbutils.DeadlockWrap(c.next, max_retries=10)
        if verbose:
            print "%s: found %d records" % (name, count)
        c.close()

        if verbose:
            print "%s: thread finished" % name


class BTreeSimpleThreaded(SimpleThreadedBase):
    dbtype = db.DB_BTREE


class HashSimpleThreaded(SimpleThreadedBase):
    dbtype = db.DB_HASH


#----------------------------------------------------------------------


class ThreadedTransactionsBase(BaseThreadedTestCase):
    dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
    envflags    = (db.DB_THREAD |
                   db.DB_INIT_MPOOL |
                   db.DB_INIT_LOCK |
                   db.DB_INIT_LOG |
                   db.DB_INIT_TXN
                   )
    readers = 0
    writers = 0
    records = 2000
    txnFlag = 0

    def setEnvOpts(self):
        #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
        pass

    def test03_ThreadedTransactions(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03_ThreadedTransactions..." % \
                  self.__class__.__name__

        keys=range(self.records)
        import random
        random.shuffle(keys)
        records_per_writer=self.records//self.writers
        readers_per_writer=self.readers//self.writers
        self.assertEqual(self.records,self.writers*records_per_writer)
        self.assertEqual(self.readers,self.writers*readers_per_writer)
        self.assertTrue((records_per_writer%readers_per_writer)==0)

        readers=[]
        for x in xrange(self.readers):
            rt = Thread(target = self.readerThread,
                        args = (self.d, x),
                        name = 'reader %d' % x,
                        )#verbose = verbose)
            import sys
            if sys.version_info[0] < 3 :
                rt.setDaemon(True)
            else :
                rt.daemon = True
            readers.append(rt)

        writers = []
        for x in xrange(self.writers):
            a=keys[records_per_writer*x:records_per_writer*(x+1)]
            b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
            wt = Thread(target = self.writerThread,
                        args = (self.d, a, b),
                        name = 'writer %d' % x,
                        )#verbose = verbose)
            writers.append(wt)

        dt = Thread(target = self.deadlockThread)
        import sys
        if sys.version_info[0] < 3 :
            dt.setDaemon(True)
        else :
            dt.daemon = True
        dt.start()

        for t in writers:
            import sys
            if sys.version_info[0] < 3 :
                t.setDaemon(True)
            else :
                t.daemon = True
            t.start()

        for t in writers:
            t.join()
        for t in readers:
            t.join()

        self.doLockDetect = False
        dt.join()

    def writerThread(self, d, keys, readers):
        import sys
        if sys.version_info[0] < 3 :
            name = currentThread().getName()
        else :
            name = currentThread().name

        count=len(keys)//len(readers)
        while len(keys):
            try:
                txn = self.env.txn_begin(None, self.txnFlag)
                keys2=keys[:count]
                for x in keys2 :
                    key = '%04d' % x
                    d.put(key, self.makeData(key), txn)
                    if verbose and x % 100 == 0:
                        print "%s: records %d - %d finished" % (name, start, x)
                txn.commit()
                keys=keys[count:]
                readers.pop().start()
            except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
                if verbose:
                    print "%s: Aborting transaction (%s)" % (name, val[1])
                txn.abort()

        if verbose:
            print "%s: thread finished" % name

    def readerThread(self, d, readerNum):
        import sys
        if sys.version_info[0] < 3 :
            name = currentThread().getName()
        else :
            name = currentThread().name

        finished = False
        while not finished:
            try:
                txn = self.env.txn_begin(None, self.txnFlag)
                c = d.cursor(txn)
                count = 0
                rec = c.first()
                while rec:
                    count += 1
                    key, data = rec
                    self.assertEqual(self.makeData(key), data)
                    rec = c.next()
                if verbose: print "%s: found %d records" % (name, count)
                c.close()
                txn.commit()
                finished = True
            except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
                if verbose:
                    print "%s: Aborting transaction (%s)" % (name, val[1])
                c.close()
                txn.abort()

        if verbose:
            print "%s: thread finished" % name

    def deadlockThread(self):
        self.doLockDetect = True
        while self.doLockDetect:
            time.sleep(0.05)
            try:
                aborted = self.env.lock_detect(
                    db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
                if verbose and aborted:
                    print "deadlock: Aborted %d deadlocked transaction(s)" \
                          % aborted
            except db.DBError:
                pass


class BTreeThreadedTransactions(ThreadedTransactionsBase):
    dbtype = db.DB_BTREE
    writers = 2
    readers = 10
    records = 1000

class HashThreadedTransactions(ThreadedTransactionsBase):
    dbtype = db.DB_HASH
    writers = 2
    readers = 10
    records = 1000

class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
    dbtype = db.DB_BTREE
    writers = 2
    readers = 10
    records = 1000
    txnFlag = db.DB_TXN_NOWAIT

class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
    dbtype = db.DB_HASH
    writers = 2
    readers = 10
    records = 1000
    txnFlag = db.DB_TXN_NOWAIT


#----------------------------------------------------------------------

def test_suite():
    suite = unittest.TestSuite()

    if have_threads:
        suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
        suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
        suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
        suite.addTest(unittest.makeSuite(HashSimpleThreaded))
        suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
        suite.addTest(unittest.makeSuite(HashThreadedTransactions))
        suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
        suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))

    else:
        print "Threads not available, skipping thread tests."

    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')