symbian-qemu-0.9.1-12/python-2.6.1/Lib/bsddb/test/test_distributed_transactions.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """TestCases for distributed transactions.
       
     2 """
       
     3 
       
     4 import os
       
     5 import unittest
       
     6 
       
     7 from test_all import db, test_support, get_new_environment_path, \
       
     8         get_new_database_path
       
     9 
       
    10 try :
       
    11     a=set()
       
    12 except : # Python 2.3
       
    13     from sets import Set as set
       
    14 else :
       
    15     del a
       
    16 
       
    17 from test_all import verbose
       
    18 
       
    19 #----------------------------------------------------------------------
       
    20 
       
    21 class DBTxn_distributed(unittest.TestCase):
       
    22     num_txns=1234
       
    23     nosync=True
       
    24     must_open_db=False
       
    25     def _create_env(self, must_open_db) :
       
    26         self.dbenv = db.DBEnv()
       
    27         self.dbenv.set_tx_max(self.num_txns)
       
    28         self.dbenv.set_lk_max_lockers(self.num_txns*2)
       
    29         self.dbenv.set_lk_max_locks(self.num_txns*2)
       
    30         self.dbenv.set_lk_max_objects(self.num_txns*2)
       
    31         if self.nosync :
       
    32             self.dbenv.set_flags(db.DB_TXN_NOSYNC,True)
       
    33         self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_THREAD |
       
    34                 db.DB_RECOVER |
       
    35                 db.DB_INIT_TXN | db.DB_INIT_LOG | db.DB_INIT_MPOOL |
       
    36                 db.DB_INIT_LOCK, 0666)
       
    37         self.db = db.DB(self.dbenv)
       
    38         self.db.set_re_len(db.DB_XIDDATASIZE)
       
    39         if must_open_db :
       
    40             if db.version() > (4,1) :
       
    41                 txn=self.dbenv.txn_begin()
       
    42                 self.db.open(self.filename,
       
    43                         db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666,
       
    44                         txn=txn)
       
    45                 txn.commit()
       
    46             else :
       
    47                 self.db.open(self.filename,
       
    48                         db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666)
       
    49 
       
    50     def setUp(self) :
       
    51         self.homeDir = get_new_environment_path()
       
    52         self.filename = "test"
       
    53         return self._create_env(must_open_db=True)
       
    54 
       
    55     def _destroy_env(self):
       
    56         if self.nosync or (db.version()[:2] == (4,6)):  # Known bug
       
    57             self.dbenv.log_flush()
       
    58         self.db.close()
       
    59         self.dbenv.close()
       
    60 
       
    61     def tearDown(self):
       
    62         self._destroy_env()
       
    63         test_support.rmtree(self.homeDir)
       
    64 
       
    65     def _recreate_env(self,must_open_db) :
       
    66         self._destroy_env()
       
    67         self._create_env(must_open_db)
       
    68 
       
    69     def test01_distributed_transactions(self) :
       
    70         txns=set()
       
    71         adapt = lambda x : x
       
    72         import sys
       
    73         if sys.version_info[0] >= 3 :
       
    74             adapt = lambda x : bytes(x, "ascii")
       
    75     # Create transactions, "prepare" them, and
       
    76     # let them be garbage collected.
       
    77         for i in xrange(self.num_txns) :
       
    78             txn = self.dbenv.txn_begin()
       
    79             gid = "%%%dd" %db.DB_XIDDATASIZE
       
    80             gid = adapt(gid %i)
       
    81             self.db.put(i, gid, txn=txn, flags=db.DB_APPEND)
       
    82             txns.add(gid)
       
    83             txn.prepare(gid)
       
    84         del txn
       
    85 
       
    86         self._recreate_env(self.must_open_db)
       
    87 
       
    88     # Get "to be recovered" transactions but
       
    89     # let them be garbage collected.
       
    90         recovered_txns=self.dbenv.txn_recover()
       
    91         self.assertEquals(self.num_txns,len(recovered_txns))
       
    92         for gid,txn in recovered_txns :
       
    93             self.assert_(gid in txns)
       
    94         del txn
       
    95         del recovered_txns
       
    96 
       
    97         self._recreate_env(self.must_open_db)
       
    98 
       
    99     # Get "to be recovered" transactions. Commit, abort and
       
   100     # discard them.
       
   101         recovered_txns=self.dbenv.txn_recover()
       
   102         self.assertEquals(self.num_txns,len(recovered_txns))
       
   103         discard_txns=set()
       
   104         committed_txns=set()
       
   105         state=0
       
   106         for gid,txn in recovered_txns :
       
   107             if state==0 or state==1:
       
   108                 committed_txns.add(gid)
       
   109                 txn.commit()
       
   110             elif state==2 :
       
   111                 txn.abort()
       
   112             elif state==3 :
       
   113                 txn.discard()
       
   114                 discard_txns.add(gid)
       
   115                 state=-1
       
   116             state+=1
       
   117         del txn
       
   118         del recovered_txns
       
   119 
       
   120         self._recreate_env(self.must_open_db)
       
   121 
       
   122     # Verify the discarded transactions are still
       
   123     # around, and dispose them.
       
   124         recovered_txns=self.dbenv.txn_recover()
       
   125         self.assertEquals(len(discard_txns),len(recovered_txns))
       
   126         for gid,txn in recovered_txns :
       
   127             txn.abort()
       
   128         del txn
       
   129         del recovered_txns
       
   130 
       
   131         self._recreate_env(must_open_db=True)
       
   132 
       
   133     # Be sure there are not pending transactions.
       
   134     # Check also database size.
       
   135         recovered_txns=self.dbenv.txn_recover()
       
   136         self.assert_(len(recovered_txns)==0)
       
   137         self.assertEquals(len(committed_txns),self.db.stat()["nkeys"])
       
   138 
       
   139 class DBTxn_distributedSYNC(DBTxn_distributed):
       
   140     nosync=False
       
   141 
       
   142 class DBTxn_distributed_must_open_db(DBTxn_distributed):
       
   143     must_open_db=True
       
   144 
       
   145 class DBTxn_distributedSYNC_must_open_db(DBTxn_distributed):
       
   146     nosync=False
       
   147     must_open_db=True
       
   148 
       
   149 #----------------------------------------------------------------------
       
   150 
       
   151 def test_suite():
       
   152     suite = unittest.TestSuite()
       
   153     if db.version() >= (4,5) :
       
   154         suite.addTest(unittest.makeSuite(DBTxn_distributed))
       
   155         suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC))
       
   156     if db.version() >= (4,6) :
       
   157         suite.addTest(unittest.makeSuite(DBTxn_distributed_must_open_db))
       
   158         suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC_must_open_db))
       
   159     return suite
       
   160 
       
   161 
       
   162 if __name__ == '__main__':
       
   163     unittest.main(defaultTest='test_suite')