symbian-qemu-0.9.1-12/python-2.6.1/Lib/bsddb/test/test_lock.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """
       
     2 TestCases for testing the locking sub-system.
       
     3 """
       
     4 
       
     5 import time
       
     6 
       
     7 import unittest
       
     8 from test_all import db, test_support, verbose, have_threads, \
       
     9         get_new_environment_path, get_new_database_path
       
    10 
       
    11 if have_threads :
       
    12     from threading import Thread
       
    13     import sys
       
    14     if sys.version_info[0] < 3 :
       
    15         from threading import currentThread
       
    16     else :
       
    17         from threading import current_thread as currentThread
       
    18 
       
    19 #----------------------------------------------------------------------
       
    20 
       
    21 class LockingTestCase(unittest.TestCase):
       
    22     import sys
       
    23     if sys.version_info[:3] < (2, 4, 0):
       
    24         def assertTrue(self, expr, msg=None):
       
    25             self.failUnless(expr,msg=msg)
       
    26 
       
    27 
       
    28     def setUp(self):
       
    29         self.homeDir = get_new_environment_path()
       
    30         self.env = db.DBEnv()
       
    31         self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
       
    32                                     db.DB_INIT_LOCK | db.DB_CREATE)
       
    33 
       
    34 
       
    35     def tearDown(self):
       
    36         self.env.close()
       
    37         test_support.rmtree(self.homeDir)
       
    38 
       
    39 
       
    40     def test01_simple(self):
       
    41         if verbose:
       
    42             print '\n', '-=' * 30
       
    43             print "Running %s.test01_simple..." % self.__class__.__name__
       
    44 
       
    45         anID = self.env.lock_id()
       
    46         if verbose:
       
    47             print "locker ID: %s" % anID
       
    48         lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
       
    49         if verbose:
       
    50             print "Aquired lock: %s" % lock
       
    51         self.env.lock_put(lock)
       
    52         if verbose:
       
    53             print "Released lock: %s" % lock
       
    54         self.env.lock_id_free(anID)
       
    55 
       
    56 
       
    57     def test02_threaded(self):
       
    58         if verbose:
       
    59             print '\n', '-=' * 30
       
    60             print "Running %s.test02_threaded..." % self.__class__.__name__
       
    61 
       
    62         threads = []
       
    63         threads.append(Thread(target = self.theThread,
       
    64                               args=(db.DB_LOCK_WRITE,)))
       
    65         threads.append(Thread(target = self.theThread,
       
    66                               args=(db.DB_LOCK_READ,)))
       
    67         threads.append(Thread(target = self.theThread,
       
    68                               args=(db.DB_LOCK_READ,)))
       
    69         threads.append(Thread(target = self.theThread,
       
    70                               args=(db.DB_LOCK_WRITE,)))
       
    71         threads.append(Thread(target = self.theThread,
       
    72                               args=(db.DB_LOCK_READ,)))
       
    73         threads.append(Thread(target = self.theThread,
       
    74                               args=(db.DB_LOCK_READ,)))
       
    75         threads.append(Thread(target = self.theThread,
       
    76                               args=(db.DB_LOCK_WRITE,)))
       
    77         threads.append(Thread(target = self.theThread,
       
    78                               args=(db.DB_LOCK_WRITE,)))
       
    79         threads.append(Thread(target = self.theThread,
       
    80                               args=(db.DB_LOCK_WRITE,)))
       
    81 
       
    82         for t in threads:
       
    83             import sys
       
    84             if sys.version_info[0] < 3 :
       
    85                 t.setDaemon(True)
       
    86             else :
       
    87                 t.daemon = True
       
    88             t.start()
       
    89         for t in threads:
       
    90             t.join()
       
    91 
       
    92     def test03_lock_timeout(self):
       
    93         self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
       
    94         self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
       
    95         self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
       
    96         self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
       
    97 
       
    98         def deadlock_detection() :
       
    99             while not deadlock_detection.end :
       
   100                 deadlock_detection.count = \
       
   101                     self.env.lock_detect(db.DB_LOCK_EXPIRE)
       
   102                 if deadlock_detection.count :
       
   103                     while not deadlock_detection.end :
       
   104                         pass
       
   105                     break
       
   106                 time.sleep(0.01)
       
   107 
       
   108         deadlock_detection.end=False
       
   109         deadlock_detection.count=0
       
   110         t=Thread(target=deadlock_detection)
       
   111         import sys
       
   112         if sys.version_info[0] < 3 :
       
   113             t.setDaemon(True)
       
   114         else :
       
   115             t.daemon = True
       
   116         t.start()
       
   117         self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
       
   118         anID = self.env.lock_id()
       
   119         anID2 = self.env.lock_id()
       
   120         self.assertNotEqual(anID, anID2)
       
   121         lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
       
   122         start_time=time.time()
       
   123         self.assertRaises(db.DBLockNotGrantedError,
       
   124                 self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
       
   125         end_time=time.time()
       
   126         deadlock_detection.end=True
       
   127         self.assertTrue((end_time-start_time) >= 0.1)
       
   128         self.env.lock_put(lock)
       
   129         t.join()
       
   130 
       
   131         self.env.lock_id_free(anID)
       
   132         self.env.lock_id_free(anID2)
       
   133 
       
   134         if db.version() >= (4,6):
       
   135             self.assertTrue(deadlock_detection.count>0)
       
   136 
       
   137     def theThread(self, lockType):
       
   138         import sys
       
   139         if sys.version_info[0] < 3 :
       
   140             name = currentThread().getName()
       
   141         else :
       
   142             name = currentThread().name
       
   143 
       
   144         if lockType ==  db.DB_LOCK_WRITE:
       
   145             lt = "write"
       
   146         else:
       
   147             lt = "read"
       
   148 
       
   149         anID = self.env.lock_id()
       
   150         if verbose:
       
   151             print "%s: locker ID: %s" % (name, anID)
       
   152 
       
   153         for i in xrange(1000) :
       
   154             lock = self.env.lock_get(anID, "some locked thing", lockType)
       
   155             if verbose:
       
   156                 print "%s: Aquired %s lock: %s" % (name, lt, lock)
       
   157 
       
   158             self.env.lock_put(lock)
       
   159             if verbose:
       
   160                 print "%s: Released %s lock: %s" % (name, lt, lock)
       
   161 
       
   162         self.env.lock_id_free(anID)
       
   163 
       
   164 
       
   165 #----------------------------------------------------------------------
       
   166 
       
   167 def test_suite():
       
   168     suite = unittest.TestSuite()
       
   169 
       
   170     if have_threads:
       
   171         suite.addTest(unittest.makeSuite(LockingTestCase))
       
   172     else:
       
   173         suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
       
   174 
       
   175     return suite
       
   176 
       
   177 
       
   178 if __name__ == '__main__':
       
   179     unittest.main(defaultTest='test_suite')