|
1 """TestCases for distributed transactions. |
|
2 """ |
|
3 |
|
4 import os |
|
5 import unittest |
|
6 |
|
7 from test_all import db, test_support, get_new_environment_path, \ |
|
8 get_new_database_path |
|
9 |
|
10 try : |
|
11 a=set() |
|
12 except : # Python 2.3 |
|
13 from sets import Set as set |
|
14 else : |
|
15 del a |
|
16 |
|
17 from test_all import verbose |
|
18 |
|
19 #---------------------------------------------------------------------- |
|
20 |
|
21 class DBTxn_distributed(unittest.TestCase): |
|
22 num_txns=1234 |
|
23 nosync=True |
|
24 must_open_db=False |
|
25 def _create_env(self, must_open_db) : |
|
26 self.dbenv = db.DBEnv() |
|
27 self.dbenv.set_tx_max(self.num_txns) |
|
28 self.dbenv.set_lk_max_lockers(self.num_txns*2) |
|
29 self.dbenv.set_lk_max_locks(self.num_txns*2) |
|
30 self.dbenv.set_lk_max_objects(self.num_txns*2) |
|
31 if self.nosync : |
|
32 self.dbenv.set_flags(db.DB_TXN_NOSYNC,True) |
|
33 self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_THREAD | |
|
34 db.DB_RECOVER | |
|
35 db.DB_INIT_TXN | db.DB_INIT_LOG | db.DB_INIT_MPOOL | |
|
36 db.DB_INIT_LOCK, 0666) |
|
37 self.db = db.DB(self.dbenv) |
|
38 self.db.set_re_len(db.DB_XIDDATASIZE) |
|
39 if must_open_db : |
|
40 if db.version() > (4,1) : |
|
41 txn=self.dbenv.txn_begin() |
|
42 self.db.open(self.filename, |
|
43 db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666, |
|
44 txn=txn) |
|
45 txn.commit() |
|
46 else : |
|
47 self.db.open(self.filename, |
|
48 db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666) |
|
49 |
|
50 def setUp(self) : |
|
51 self.homeDir = get_new_environment_path() |
|
52 self.filename = "test" |
|
53 return self._create_env(must_open_db=True) |
|
54 |
|
55 def _destroy_env(self): |
|
56 if self.nosync or (db.version()[:2] == (4,6)): # Known bug |
|
57 self.dbenv.log_flush() |
|
58 self.db.close() |
|
59 self.dbenv.close() |
|
60 |
|
61 def tearDown(self): |
|
62 self._destroy_env() |
|
63 test_support.rmtree(self.homeDir) |
|
64 |
|
65 def _recreate_env(self,must_open_db) : |
|
66 self._destroy_env() |
|
67 self._create_env(must_open_db) |
|
68 |
|
69 def test01_distributed_transactions(self) : |
|
70 txns=set() |
|
71 adapt = lambda x : x |
|
72 import sys |
|
73 if sys.version_info[0] >= 3 : |
|
74 adapt = lambda x : bytes(x, "ascii") |
|
75 # Create transactions, "prepare" them, and |
|
76 # let them be garbage collected. |
|
77 for i in xrange(self.num_txns) : |
|
78 txn = self.dbenv.txn_begin() |
|
79 gid = "%%%dd" %db.DB_XIDDATASIZE |
|
80 gid = adapt(gid %i) |
|
81 self.db.put(i, gid, txn=txn, flags=db.DB_APPEND) |
|
82 txns.add(gid) |
|
83 txn.prepare(gid) |
|
84 del txn |
|
85 |
|
86 self._recreate_env(self.must_open_db) |
|
87 |
|
88 # Get "to be recovered" transactions but |
|
89 # let them be garbage collected. |
|
90 recovered_txns=self.dbenv.txn_recover() |
|
91 self.assertEquals(self.num_txns,len(recovered_txns)) |
|
92 for gid,txn in recovered_txns : |
|
93 self.assert_(gid in txns) |
|
94 del txn |
|
95 del recovered_txns |
|
96 |
|
97 self._recreate_env(self.must_open_db) |
|
98 |
|
99 # Get "to be recovered" transactions. Commit, abort and |
|
100 # discard them. |
|
101 recovered_txns=self.dbenv.txn_recover() |
|
102 self.assertEquals(self.num_txns,len(recovered_txns)) |
|
103 discard_txns=set() |
|
104 committed_txns=set() |
|
105 state=0 |
|
106 for gid,txn in recovered_txns : |
|
107 if state==0 or state==1: |
|
108 committed_txns.add(gid) |
|
109 txn.commit() |
|
110 elif state==2 : |
|
111 txn.abort() |
|
112 elif state==3 : |
|
113 txn.discard() |
|
114 discard_txns.add(gid) |
|
115 state=-1 |
|
116 state+=1 |
|
117 del txn |
|
118 del recovered_txns |
|
119 |
|
120 self._recreate_env(self.must_open_db) |
|
121 |
|
122 # Verify the discarded transactions are still |
|
123 # around, and dispose them. |
|
124 recovered_txns=self.dbenv.txn_recover() |
|
125 self.assertEquals(len(discard_txns),len(recovered_txns)) |
|
126 for gid,txn in recovered_txns : |
|
127 txn.abort() |
|
128 del txn |
|
129 del recovered_txns |
|
130 |
|
131 self._recreate_env(must_open_db=True) |
|
132 |
|
133 # Be sure there are not pending transactions. |
|
134 # Check also database size. |
|
135 recovered_txns=self.dbenv.txn_recover() |
|
136 self.assert_(len(recovered_txns)==0) |
|
137 self.assertEquals(len(committed_txns),self.db.stat()["nkeys"]) |
|
138 |
|
139 class DBTxn_distributedSYNC(DBTxn_distributed): |
|
140 nosync=False |
|
141 |
|
142 class DBTxn_distributed_must_open_db(DBTxn_distributed): |
|
143 must_open_db=True |
|
144 |
|
145 class DBTxn_distributedSYNC_must_open_db(DBTxn_distributed): |
|
146 nosync=False |
|
147 must_open_db=True |
|
148 |
|
149 #---------------------------------------------------------------------- |
|
150 |
|
151 def test_suite(): |
|
152 suite = unittest.TestSuite() |
|
153 if db.version() >= (4,5) : |
|
154 suite.addTest(unittest.makeSuite(DBTxn_distributed)) |
|
155 suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC)) |
|
156 if db.version() >= (4,6) : |
|
157 suite.addTest(unittest.makeSuite(DBTxn_distributed_must_open_db)) |
|
158 suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC_must_open_db)) |
|
159 return suite |
|
160 |
|
161 |
|
162 if __name__ == '__main__': |
|
163 unittest.main(defaultTest='test_suite') |