symbian-qemu-0.9.1-12/python-2.6.1/Lib/bsddb/test/test_thread.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """TestCases for multi-threaded access to a DB.
       
     2 """
       
     3 
       
     4 import os
       
     5 import sys
       
     6 import time
       
     7 import errno
       
     8 from random import random
       
     9 
       
    10 DASH = '-'
       
    11 
       
    12 try:
       
    13     WindowsError
       
    14 except NameError:
       
    15     class WindowsError(Exception):
       
    16         pass
       
    17 
       
    18 import unittest
       
    19 from test_all import db, dbutils, test_support, verbose, have_threads, \
       
    20         get_new_environment_path, get_new_database_path
       
    21 
       
    22 if have_threads :
       
    23     from threading import Thread
       
    24     import sys
       
    25     if sys.version_info[0] < 3 :
       
    26         from threading import currentThread
       
    27     else :
       
    28         from threading import current_thread as currentThread
       
    29 
       
    30 
       
    31 #----------------------------------------------------------------------
       
    32 
       
    33 class BaseThreadedTestCase(unittest.TestCase):
       
    34     dbtype       = db.DB_UNKNOWN  # must be set in derived class
       
    35     dbopenflags  = 0
       
    36     dbsetflags   = 0
       
    37     envflags     = 0
       
    38 
       
    39     import sys
       
    40     if sys.version_info[:3] < (2, 4, 0):
       
    41         def assertTrue(self, expr, msg=None):
       
    42             self.failUnless(expr,msg=msg)
       
    43 
       
    44     def setUp(self):
       
    45         if verbose:
       
    46             dbutils._deadlock_VerboseFile = sys.stdout
       
    47 
       
    48         self.homeDir = get_new_environment_path()
       
    49         self.env = db.DBEnv()
       
    50         self.setEnvOpts()
       
    51         self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
       
    52 
       
    53         self.filename = self.__class__.__name__ + '.db'
       
    54         self.d = db.DB(self.env)
       
    55         if self.dbsetflags:
       
    56             self.d.set_flags(self.dbsetflags)
       
    57         self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
       
    58 
       
    59     def tearDown(self):
       
    60         self.d.close()
       
    61         self.env.close()
       
    62         test_support.rmtree(self.homeDir)
       
    63 
       
    64     def setEnvOpts(self):
       
    65         pass
       
    66 
       
    67     def makeData(self, key):
       
    68         return DASH.join([key] * 5)
       
    69 
       
    70 
       
    71 #----------------------------------------------------------------------
       
    72 
       
    73 
       
    74 class ConcurrentDataStoreBase(BaseThreadedTestCase):
       
    75     dbopenflags = db.DB_THREAD
       
    76     envflags    = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
       
    77     readers     = 0 # derived class should set
       
    78     writers     = 0
       
    79     records     = 1000
       
    80 
       
    81     def test01_1WriterMultiReaders(self):
       
    82         if verbose:
       
    83             print '\n', '-=' * 30
       
    84             print "Running %s.test01_1WriterMultiReaders..." % \
       
    85                   self.__class__.__name__
       
    86 
       
    87         keys=range(self.records)
       
    88         import random
       
    89         random.shuffle(keys)
       
    90         records_per_writer=self.records//self.writers
       
    91         readers_per_writer=self.readers//self.writers
       
    92         self.assertEqual(self.records,self.writers*records_per_writer)
       
    93         self.assertEqual(self.readers,self.writers*readers_per_writer)
       
    94         self.assertTrue((records_per_writer%readers_per_writer)==0)
       
    95         readers = []
       
    96 
       
    97         for x in xrange(self.readers):
       
    98             rt = Thread(target = self.readerThread,
       
    99                         args = (self.d, x),
       
   100                         name = 'reader %d' % x,
       
   101                         )#verbose = verbose)
       
   102             import sys
       
   103             if sys.version_info[0] < 3 :
       
   104                 rt.setDaemon(True)
       
   105             else :
       
   106                 rt.daemon = True
       
   107             readers.append(rt)
       
   108 
       
   109         writers=[]
       
   110         for x in xrange(self.writers):
       
   111             a=keys[records_per_writer*x:records_per_writer*(x+1)]
       
   112             a.sort()  # Generate conflicts
       
   113             b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
       
   114             wt = Thread(target = self.writerThread,
       
   115                         args = (self.d, a, b),
       
   116                         name = 'writer %d' % x,
       
   117                         )#verbose = verbose)
       
   118             writers.append(wt)
       
   119 
       
   120         for t in writers:
       
   121             import sys
       
   122             if sys.version_info[0] < 3 :
       
   123                 t.setDaemon(True)
       
   124             else :
       
   125                 t.daemon = True
       
   126             t.start()
       
   127 
       
   128         for t in writers:
       
   129             t.join()
       
   130         for t in readers:
       
   131             t.join()
       
   132 
       
   133     def writerThread(self, d, keys, readers):
       
   134         import sys
       
   135         if sys.version_info[0] < 3 :
       
   136             name = currentThread().getName()
       
   137         else :
       
   138             name = currentThread().name
       
   139 
       
   140         if verbose:
       
   141             print "%s: creating records %d - %d" % (name, start, stop)
       
   142 
       
   143         count=len(keys)//len(readers)
       
   144         count2=count
       
   145         for x in keys :
       
   146             key = '%04d' % x
       
   147             dbutils.DeadlockWrap(d.put, key, self.makeData(key),
       
   148                                  max_retries=12)
       
   149             if verbose and x % 100 == 0:
       
   150                 print "%s: records %d - %d finished" % (name, start, x)
       
   151 
       
   152             count2-=1
       
   153             if not count2 :
       
   154                 readers.pop().start()
       
   155                 count2=count
       
   156 
       
   157         if verbose:
       
   158             print "%s: finished creating records" % name
       
   159 
       
   160         if verbose:
       
   161             print "%s: thread finished" % name
       
   162 
       
   163     def readerThread(self, d, readerNum):
       
   164         import sys
       
   165         if sys.version_info[0] < 3 :
       
   166             name = currentThread().getName()
       
   167         else :
       
   168             name = currentThread().name
       
   169 
       
   170         for i in xrange(5) :
       
   171             c = d.cursor()
       
   172             count = 0
       
   173             rec = c.first()
       
   174             while rec:
       
   175                 count += 1
       
   176                 key, data = rec
       
   177                 self.assertEqual(self.makeData(key), data)
       
   178                 rec = c.next()
       
   179             if verbose:
       
   180                 print "%s: found %d records" % (name, count)
       
   181             c.close()
       
   182 
       
   183         if verbose:
       
   184             print "%s: thread finished" % name
       
   185 
       
   186 
       
   187 class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
       
   188     dbtype  = db.DB_BTREE
       
   189     writers = 2
       
   190     readers = 10
       
   191     records = 1000
       
   192 
       
   193 
       
   194 class HashConcurrentDataStore(ConcurrentDataStoreBase):
       
   195     dbtype  = db.DB_HASH
       
   196     writers = 2
       
   197     readers = 10
       
   198     records = 1000
       
   199 
       
   200 
       
   201 #----------------------------------------------------------------------
       
   202 
       
   203 class SimpleThreadedBase(BaseThreadedTestCase):
       
   204     dbopenflags = db.DB_THREAD
       
   205     envflags    = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
       
   206     readers = 10
       
   207     writers = 2
       
   208     records = 1000
       
   209 
       
   210     def setEnvOpts(self):
       
   211         self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
       
   212 
       
   213     def test02_SimpleLocks(self):
       
   214         if verbose:
       
   215             print '\n', '-=' * 30
       
   216             print "Running %s.test02_SimpleLocks..." % self.__class__.__name__
       
   217 
       
   218 
       
   219         keys=range(self.records)
       
   220         import random
       
   221         random.shuffle(keys)
       
   222         records_per_writer=self.records//self.writers
       
   223         readers_per_writer=self.readers//self.writers
       
   224         self.assertEqual(self.records,self.writers*records_per_writer)
       
   225         self.assertEqual(self.readers,self.writers*readers_per_writer)
       
   226         self.assertTrue((records_per_writer%readers_per_writer)==0)
       
   227 
       
   228         readers = []
       
   229         for x in xrange(self.readers):
       
   230             rt = Thread(target = self.readerThread,
       
   231                         args = (self.d, x),
       
   232                         name = 'reader %d' % x,
       
   233                         )#verbose = verbose)
       
   234             import sys
       
   235             if sys.version_info[0] < 3 :
       
   236                 rt.setDaemon(True)
       
   237             else :
       
   238                 rt.daemon = True
       
   239             readers.append(rt)
       
   240 
       
   241         writers = []
       
   242         for x in xrange(self.writers):
       
   243             a=keys[records_per_writer*x:records_per_writer*(x+1)]
       
   244             a.sort()  # Generate conflicts
       
   245             b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
       
   246             wt = Thread(target = self.writerThread,
       
   247                         args = (self.d, a, b),
       
   248                         name = 'writer %d' % x,
       
   249                         )#verbose = verbose)
       
   250             writers.append(wt)
       
   251 
       
   252         for t in writers:
       
   253             import sys
       
   254             if sys.version_info[0] < 3 :
       
   255                 t.setDaemon(True)
       
   256             else :
       
   257                 t.daemon = True
       
   258             t.start()
       
   259 
       
   260         for t in writers:
       
   261             t.join()
       
   262         for t in readers:
       
   263             t.join()
       
   264 
       
   265     def writerThread(self, d, keys, readers):
       
   266         import sys
       
   267         if sys.version_info[0] < 3 :
       
   268             name = currentThread().getName()
       
   269         else :
       
   270             name = currentThread().name
       
   271         if verbose:
       
   272             print "%s: creating records %d - %d" % (name, start, stop)
       
   273 
       
   274         count=len(keys)//len(readers)
       
   275         count2=count
       
   276         for x in keys :
       
   277             key = '%04d' % x
       
   278             dbutils.DeadlockWrap(d.put, key, self.makeData(key),
       
   279                                  max_retries=12)
       
   280 
       
   281             if verbose and x % 100 == 0:
       
   282                 print "%s: records %d - %d finished" % (name, start, x)
       
   283 
       
   284             count2-=1
       
   285             if not count2 :
       
   286                 readers.pop().start()
       
   287                 count2=count
       
   288 
       
   289         if verbose:
       
   290             print "%s: thread finished" % name
       
   291 
       
   292     def readerThread(self, d, readerNum):
       
   293         import sys
       
   294         if sys.version_info[0] < 3 :
       
   295             name = currentThread().getName()
       
   296         else :
       
   297             name = currentThread().name
       
   298 
       
   299         c = d.cursor()
       
   300         count = 0
       
   301         rec = dbutils.DeadlockWrap(c.first, max_retries=10)
       
   302         while rec:
       
   303             count += 1
       
   304             key, data = rec
       
   305             self.assertEqual(self.makeData(key), data)
       
   306             rec = dbutils.DeadlockWrap(c.next, max_retries=10)
       
   307         if verbose:
       
   308             print "%s: found %d records" % (name, count)
       
   309         c.close()
       
   310 
       
   311         if verbose:
       
   312             print "%s: thread finished" % name
       
   313 
       
   314 
       
   315 class BTreeSimpleThreaded(SimpleThreadedBase):
       
   316     dbtype = db.DB_BTREE
       
   317 
       
   318 
       
   319 class HashSimpleThreaded(SimpleThreadedBase):
       
   320     dbtype = db.DB_HASH
       
   321 
       
   322 
       
   323 #----------------------------------------------------------------------
       
   324 
       
   325 
       
   326 class ThreadedTransactionsBase(BaseThreadedTestCase):
       
   327     dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
       
   328     envflags    = (db.DB_THREAD |
       
   329                    db.DB_INIT_MPOOL |
       
   330                    db.DB_INIT_LOCK |
       
   331                    db.DB_INIT_LOG |
       
   332                    db.DB_INIT_TXN
       
   333                    )
       
   334     readers = 0
       
   335     writers = 0
       
   336     records = 2000
       
   337     txnFlag = 0
       
   338 
       
   339     def setEnvOpts(self):
       
   340         #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
       
   341         pass
       
   342 
       
   343     def test03_ThreadedTransactions(self):
       
   344         if verbose:
       
   345             print '\n', '-=' * 30
       
   346             print "Running %s.test03_ThreadedTransactions..." % \
       
   347                   self.__class__.__name__
       
   348 
       
   349         keys=range(self.records)
       
   350         import random
       
   351         random.shuffle(keys)
       
   352         records_per_writer=self.records//self.writers
       
   353         readers_per_writer=self.readers//self.writers
       
   354         self.assertEqual(self.records,self.writers*records_per_writer)
       
   355         self.assertEqual(self.readers,self.writers*readers_per_writer)
       
   356         self.assertTrue((records_per_writer%readers_per_writer)==0)
       
   357 
       
   358         readers=[]
       
   359         for x in xrange(self.readers):
       
   360             rt = Thread(target = self.readerThread,
       
   361                         args = (self.d, x),
       
   362                         name = 'reader %d' % x,
       
   363                         )#verbose = verbose)
       
   364             import sys
       
   365             if sys.version_info[0] < 3 :
       
   366                 rt.setDaemon(True)
       
   367             else :
       
   368                 rt.daemon = True
       
   369             readers.append(rt)
       
   370 
       
   371         writers = []
       
   372         for x in xrange(self.writers):
       
   373             a=keys[records_per_writer*x:records_per_writer*(x+1)]
       
   374             b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
       
   375             wt = Thread(target = self.writerThread,
       
   376                         args = (self.d, a, b),
       
   377                         name = 'writer %d' % x,
       
   378                         )#verbose = verbose)
       
   379             writers.append(wt)
       
   380 
       
   381         dt = Thread(target = self.deadlockThread)
       
   382         import sys
       
   383         if sys.version_info[0] < 3 :
       
   384             dt.setDaemon(True)
       
   385         else :
       
   386             dt.daemon = True
       
   387         dt.start()
       
   388 
       
   389         for t in writers:
       
   390             import sys
       
   391             if sys.version_info[0] < 3 :
       
   392                 t.setDaemon(True)
       
   393             else :
       
   394                 t.daemon = True
       
   395             t.start()
       
   396 
       
   397         for t in writers:
       
   398             t.join()
       
   399         for t in readers:
       
   400             t.join()
       
   401 
       
   402         self.doLockDetect = False
       
   403         dt.join()
       
   404 
       
   405     def writerThread(self, d, keys, readers):
       
   406         import sys
       
   407         if sys.version_info[0] < 3 :
       
   408             name = currentThread().getName()
       
   409         else :
       
   410             name = currentThread().name
       
   411 
       
   412         count=len(keys)//len(readers)
       
   413         while len(keys):
       
   414             try:
       
   415                 txn = self.env.txn_begin(None, self.txnFlag)
       
   416                 keys2=keys[:count]
       
   417                 for x in keys2 :
       
   418                     key = '%04d' % x
       
   419                     d.put(key, self.makeData(key), txn)
       
   420                     if verbose and x % 100 == 0:
       
   421                         print "%s: records %d - %d finished" % (name, start, x)
       
   422                 txn.commit()
       
   423                 keys=keys[count:]
       
   424                 readers.pop().start()
       
   425             except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
       
   426                 if verbose:
       
   427                     print "%s: Aborting transaction (%s)" % (name, val[1])
       
   428                 txn.abort()
       
   429 
       
   430         if verbose:
       
   431             print "%s: thread finished" % name
       
   432 
       
   433     def readerThread(self, d, readerNum):
       
   434         import sys
       
   435         if sys.version_info[0] < 3 :
       
   436             name = currentThread().getName()
       
   437         else :
       
   438             name = currentThread().name
       
   439 
       
   440         finished = False
       
   441         while not finished:
       
   442             try:
       
   443                 txn = self.env.txn_begin(None, self.txnFlag)
       
   444                 c = d.cursor(txn)
       
   445                 count = 0
       
   446                 rec = c.first()
       
   447                 while rec:
       
   448                     count += 1
       
   449                     key, data = rec
       
   450                     self.assertEqual(self.makeData(key), data)
       
   451                     rec = c.next()
       
   452                 if verbose: print "%s: found %d records" % (name, count)
       
   453                 c.close()
       
   454                 txn.commit()
       
   455                 finished = True
       
   456             except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
       
   457                 if verbose:
       
   458                     print "%s: Aborting transaction (%s)" % (name, val[1])
       
   459                 c.close()
       
   460                 txn.abort()
       
   461 
       
   462         if verbose:
       
   463             print "%s: thread finished" % name
       
   464 
       
   465     def deadlockThread(self):
       
   466         self.doLockDetect = True
       
   467         while self.doLockDetect:
       
   468             time.sleep(0.05)
       
   469             try:
       
   470                 aborted = self.env.lock_detect(
       
   471                     db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
       
   472                 if verbose and aborted:
       
   473                     print "deadlock: Aborted %d deadlocked transaction(s)" \
       
   474                           % aborted
       
   475             except db.DBError:
       
   476                 pass
       
   477 
       
   478 
       
   479 class BTreeThreadedTransactions(ThreadedTransactionsBase):
       
   480     dbtype = db.DB_BTREE
       
   481     writers = 2
       
   482     readers = 10
       
   483     records = 1000
       
   484 
       
   485 class HashThreadedTransactions(ThreadedTransactionsBase):
       
   486     dbtype = db.DB_HASH
       
   487     writers = 2
       
   488     readers = 10
       
   489     records = 1000
       
   490 
       
   491 class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
       
   492     dbtype = db.DB_BTREE
       
   493     writers = 2
       
   494     readers = 10
       
   495     records = 1000
       
   496     txnFlag = db.DB_TXN_NOWAIT
       
   497 
       
   498 class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
       
   499     dbtype = db.DB_HASH
       
   500     writers = 2
       
   501     readers = 10
       
   502     records = 1000
       
   503     txnFlag = db.DB_TXN_NOWAIT
       
   504 
       
   505 
       
   506 #----------------------------------------------------------------------
       
   507 
       
   508 def test_suite():
       
   509     suite = unittest.TestSuite()
       
   510 
       
   511     if have_threads:
       
   512         suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
       
   513         suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
       
   514         suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
       
   515         suite.addTest(unittest.makeSuite(HashSimpleThreaded))
       
   516         suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
       
   517         suite.addTest(unittest.makeSuite(HashThreadedTransactions))
       
   518         suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
       
   519         suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))
       
   520 
       
   521     else:
       
   522         print "Threads not available, skipping thread tests."
       
   523 
       
   524     return suite
       
   525 
       
   526 
       
   527 if __name__ == '__main__':
       
   528     unittest.main(defaultTest='test_suite')