|
1 """TestCases for distributed transactions. |
|
2 """ |
|
3 |
|
4 import os |
|
5 import time |
|
6 import unittest |
|
7 |
|
8 from test_all import db, test_support, have_threads, verbose, \ |
|
9 get_new_environment_path, get_new_database_path |
|
10 |
|
11 |
|
12 #---------------------------------------------------------------------- |
|
13 |
|
14 class DBReplicationManager(unittest.TestCase): |
|
15 import sys |
|
16 if sys.version_info[:3] < (2, 4, 0): |
|
17 def assertTrue(self, expr, msg=None): |
|
18 self.failUnless(expr,msg=msg) |
|
19 |
|
20 def setUp(self) : |
|
21 self.homeDirMaster = get_new_environment_path() |
|
22 self.homeDirClient = get_new_environment_path() |
|
23 |
|
24 self.dbenvMaster = db.DBEnv() |
|
25 self.dbenvClient = db.DBEnv() |
|
26 |
|
27 # Must use "DB_THREAD" because the Replication Manager will |
|
28 # be executed in other threads but will use the same environment. |
|
29 # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0 |
|
30 self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN |
|
31 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | |
|
32 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666) |
|
33 self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN |
|
34 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | |
|
35 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666) |
|
36 |
|
37 self.confirmed_master=self.client_startupdone=False |
|
38 def confirmed_master(a,b,c) : |
|
39 if b==db.DB_EVENT_REP_MASTER : |
|
40 self.confirmed_master=True |
|
41 |
|
42 def client_startupdone(a,b,c) : |
|
43 if b==db.DB_EVENT_REP_STARTUPDONE : |
|
44 self.client_startupdone=True |
|
45 |
|
46 self.dbenvMaster.set_event_notify(confirmed_master) |
|
47 self.dbenvClient.set_event_notify(client_startupdone) |
|
48 |
|
49 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) |
|
50 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) |
|
51 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) |
|
52 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) |
|
53 |
|
54 self.dbMaster = self.dbClient = None |
|
55 |
|
56 |
|
57 def tearDown(self): |
|
58 if self.dbClient : |
|
59 self.dbClient.close() |
|
60 if self.dbMaster : |
|
61 self.dbMaster.close() |
|
62 self.dbenvClient.close() |
|
63 self.dbenvMaster.close() |
|
64 test_support.rmtree(self.homeDirClient) |
|
65 test_support.rmtree(self.homeDirMaster) |
|
66 |
|
67 def test01_basic_replication(self) : |
|
68 master_port = test_support.find_unused_port() |
|
69 self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port) |
|
70 client_port = test_support.find_unused_port() |
|
71 self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port) |
|
72 self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port) |
|
73 self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port) |
|
74 self.dbenvMaster.rep_set_nsites(2) |
|
75 self.dbenvClient.rep_set_nsites(2) |
|
76 self.dbenvMaster.rep_set_priority(10) |
|
77 self.dbenvClient.rep_set_priority(0) |
|
78 |
|
79 self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123) |
|
80 self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321) |
|
81 self.assertEquals(self.dbenvMaster.rep_get_timeout( |
|
82 db.DB_REP_CONNECTION_RETRY), 100123) |
|
83 self.assertEquals(self.dbenvClient.rep_get_timeout( |
|
84 db.DB_REP_CONNECTION_RETRY), 100321) |
|
85 |
|
86 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234) |
|
87 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432) |
|
88 self.assertEquals(self.dbenvMaster.rep_get_timeout( |
|
89 db.DB_REP_ELECTION_TIMEOUT), 100234) |
|
90 self.assertEquals(self.dbenvClient.rep_get_timeout( |
|
91 db.DB_REP_ELECTION_TIMEOUT), 100432) |
|
92 |
|
93 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345) |
|
94 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543) |
|
95 self.assertEquals(self.dbenvMaster.rep_get_timeout( |
|
96 db.DB_REP_ELECTION_RETRY), 100345) |
|
97 self.assertEquals(self.dbenvClient.rep_get_timeout( |
|
98 db.DB_REP_ELECTION_RETRY), 100543) |
|
99 |
|
100 self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) |
|
101 self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) |
|
102 |
|
103 self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER); |
|
104 self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT); |
|
105 |
|
106 self.assertEquals(self.dbenvMaster.rep_get_nsites(),2) |
|
107 self.assertEquals(self.dbenvClient.rep_get_nsites(),2) |
|
108 self.assertEquals(self.dbenvMaster.rep_get_priority(),10) |
|
109 self.assertEquals(self.dbenvClient.rep_get_priority(),0) |
|
110 self.assertEquals(self.dbenvMaster.repmgr_get_ack_policy(), |
|
111 db.DB_REPMGR_ACKS_ALL) |
|
112 self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(), |
|
113 db.DB_REPMGR_ACKS_ALL) |
|
114 |
|
115 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE |
|
116 # is not generated if the master has no new transactions. |
|
117 # This is solved in BDB 4.6 (#15542). |
|
118 import time |
|
119 timeout = time.time()+10 |
|
120 while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) : |
|
121 time.sleep(0.02) |
|
122 # this fails on Windows as self.client_startupdone never gets set |
|
123 # to True - see bug 3892. BUT - even though this assertion |
|
124 # fails on Windows the rest of the test passes - so to prove |
|
125 # that we let the rest of the test run. Sadly we can't |
|
126 # make use of raising TestSkipped() here (unittest still |
|
127 # reports it as an error), so we yell to stderr. |
|
128 import sys |
|
129 if sys.platform=="win32": |
|
130 print >> sys.stderr, \ |
|
131 "XXX - windows bsddb replication fails on windows and is skipped" |
|
132 print >> sys.stderr, "XXX - Please see issue #3892" |
|
133 else: |
|
134 self.assertTrue(time.time()<timeout) |
|
135 |
|
136 d = self.dbenvMaster.repmgr_site_list() |
|
137 self.assertEquals(len(d), 1) |
|
138 self.assertEquals(d[0][0], "127.0.0.1") |
|
139 self.assertEquals(d[0][1], client_port) |
|
140 self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \ |
|
141 (d[0][2]==db.DB_REPMGR_DISCONNECTED)) |
|
142 |
|
143 d = self.dbenvClient.repmgr_site_list() |
|
144 self.assertEquals(len(d), 1) |
|
145 self.assertEquals(d[0][0], "127.0.0.1") |
|
146 self.assertEquals(d[0][1], master_port) |
|
147 self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \ |
|
148 (d[0][2]==db.DB_REPMGR_DISCONNECTED)) |
|
149 |
|
150 if db.version() >= (4,6) : |
|
151 d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR); |
|
152 self.assertTrue("msgs_queued" in d) |
|
153 |
|
154 self.dbMaster=db.DB(self.dbenvMaster) |
|
155 txn=self.dbenvMaster.txn_begin() |
|
156 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) |
|
157 txn.commit() |
|
158 |
|
159 import time,os.path |
|
160 timeout=time.time()+10 |
|
161 while (time.time()<timeout) and \ |
|
162 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : |
|
163 time.sleep(0.01) |
|
164 |
|
165 self.dbClient=db.DB(self.dbenvClient) |
|
166 while True : |
|
167 txn=self.dbenvClient.txn_begin() |
|
168 try : |
|
169 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, |
|
170 mode=0666, txn=txn) |
|
171 except db.DBRepHandleDeadError : |
|
172 txn.abort() |
|
173 self.dbClient.close() |
|
174 self.dbClient=db.DB(self.dbenvClient) |
|
175 continue |
|
176 |
|
177 txn.commit() |
|
178 break |
|
179 |
|
180 txn=self.dbenvMaster.txn_begin() |
|
181 self.dbMaster.put("ABC", "123", txn=txn) |
|
182 txn.commit() |
|
183 import time |
|
184 timeout=time.time()+10 |
|
185 v=None |
|
186 while (time.time()<timeout) and (v==None) : |
|
187 txn=self.dbenvClient.txn_begin() |
|
188 v=self.dbClient.get("ABC", txn=txn) |
|
189 txn.commit() |
|
190 if v==None : |
|
191 time.sleep(0.02) |
|
192 self.assertTrue(time.time()<timeout) |
|
193 self.assertEquals("123", v) |
|
194 |
|
195 txn=self.dbenvMaster.txn_begin() |
|
196 self.dbMaster.delete("ABC", txn=txn) |
|
197 txn.commit() |
|
198 timeout=time.time()+10 |
|
199 while (time.time()<timeout) and (v!=None) : |
|
200 txn=self.dbenvClient.txn_begin() |
|
201 v=self.dbClient.get("ABC", txn=txn) |
|
202 txn.commit() |
|
203 if v==None : |
|
204 time.sleep(0.02) |
|
205 self.assertTrue(time.time()<timeout) |
|
206 self.assertEquals(None, v) |
|
207 |
|
208 class DBBaseReplication(DBReplicationManager): |
|
209 def setUp(self) : |
|
210 DBReplicationManager.setUp(self) |
|
211 def confirmed_master(a,b,c) : |
|
212 if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) : |
|
213 self.confirmed_master = True |
|
214 |
|
215 def client_startupdone(a,b,c) : |
|
216 if b == db.DB_EVENT_REP_STARTUPDONE : |
|
217 self.client_startupdone = True |
|
218 |
|
219 self.dbenvMaster.set_event_notify(confirmed_master) |
|
220 self.dbenvClient.set_event_notify(client_startupdone) |
|
221 |
|
222 import Queue |
|
223 self.m2c = Queue.Queue() |
|
224 self.c2m = Queue.Queue() |
|
225 |
|
226 # There are only two nodes, so we don't need to |
|
227 # do any routing decision |
|
228 def m2c(dbenv, control, rec, lsnp, envid, flags) : |
|
229 self.m2c.put((control, rec)) |
|
230 |
|
231 def c2m(dbenv, control, rec, lsnp, envid, flags) : |
|
232 self.c2m.put((control, rec)) |
|
233 |
|
234 self.dbenvMaster.rep_set_transport(13,m2c) |
|
235 self.dbenvMaster.rep_set_priority(10) |
|
236 self.dbenvClient.rep_set_transport(3,c2m) |
|
237 self.dbenvClient.rep_set_priority(0) |
|
238 |
|
239 self.assertEquals(self.dbenvMaster.rep_get_priority(),10) |
|
240 self.assertEquals(self.dbenvClient.rep_get_priority(),0) |
|
241 |
|
242 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) |
|
243 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) |
|
244 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) |
|
245 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) |
|
246 |
|
247 def thread_master() : |
|
248 return self.thread_do(self.dbenvMaster, self.c2m, 3, |
|
249 self.master_doing_election, True) |
|
250 |
|
251 def thread_client() : |
|
252 return self.thread_do(self.dbenvClient, self.m2c, 13, |
|
253 self.client_doing_election, False) |
|
254 |
|
255 from threading import Thread |
|
256 t_m=Thread(target=thread_master) |
|
257 t_c=Thread(target=thread_client) |
|
258 import sys |
|
259 if sys.version_info[0] < 3 : |
|
260 t_m.setDaemon(True) |
|
261 t_c.setDaemon(True) |
|
262 else : |
|
263 t_m.daemon = True |
|
264 t_c.daemon = True |
|
265 |
|
266 self.t_m = t_m |
|
267 self.t_c = t_c |
|
268 |
|
269 self.dbMaster = self.dbClient = None |
|
270 |
|
271 self.master_doing_election=[False] |
|
272 self.client_doing_election=[False] |
|
273 |
|
274 |
|
275 def tearDown(self): |
|
276 if self.dbClient : |
|
277 self.dbClient.close() |
|
278 if self.dbMaster : |
|
279 self.dbMaster.close() |
|
280 self.m2c.put(None) |
|
281 self.c2m.put(None) |
|
282 self.t_m.join() |
|
283 self.t_c.join() |
|
284 self.dbenvClient.close() |
|
285 self.dbenvMaster.close() |
|
286 test_support.rmtree(self.homeDirClient) |
|
287 test_support.rmtree(self.homeDirMaster) |
|
288 |
|
289 def basic_rep_threading(self) : |
|
290 self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) |
|
291 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) |
|
292 |
|
293 def thread_do(env, q, envid, election_status, must_be_master) : |
|
294 while True : |
|
295 v=q.get() |
|
296 if v == None : return |
|
297 env.rep_process_message(v[0], v[1], envid) |
|
298 |
|
299 self.thread_do = thread_do |
|
300 |
|
301 self.t_m.start() |
|
302 self.t_c.start() |
|
303 |
|
304 def test01_basic_replication(self) : |
|
305 self.basic_rep_threading() |
|
306 |
|
307 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE |
|
308 # is not generated if the master has no new transactions. |
|
309 # This is solved in BDB 4.6 (#15542). |
|
310 import time |
|
311 timeout = time.time()+10 |
|
312 while (time.time()<timeout) and not (self.confirmed_master and |
|
313 self.client_startupdone) : |
|
314 time.sleep(0.02) |
|
315 self.assertTrue(time.time()<timeout) |
|
316 |
|
317 self.dbMaster=db.DB(self.dbenvMaster) |
|
318 txn=self.dbenvMaster.txn_begin() |
|
319 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) |
|
320 txn.commit() |
|
321 |
|
322 import time,os.path |
|
323 timeout=time.time()+10 |
|
324 while (time.time()<timeout) and \ |
|
325 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : |
|
326 time.sleep(0.01) |
|
327 |
|
328 self.dbClient=db.DB(self.dbenvClient) |
|
329 while True : |
|
330 txn=self.dbenvClient.txn_begin() |
|
331 try : |
|
332 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, |
|
333 mode=0666, txn=txn) |
|
334 except db.DBRepHandleDeadError : |
|
335 txn.abort() |
|
336 self.dbClient.close() |
|
337 self.dbClient=db.DB(self.dbenvClient) |
|
338 continue |
|
339 |
|
340 txn.commit() |
|
341 break |
|
342 |
|
343 txn=self.dbenvMaster.txn_begin() |
|
344 self.dbMaster.put("ABC", "123", txn=txn) |
|
345 txn.commit() |
|
346 import time |
|
347 timeout=time.time()+10 |
|
348 v=None |
|
349 while (time.time()<timeout) and (v==None) : |
|
350 txn=self.dbenvClient.txn_begin() |
|
351 v=self.dbClient.get("ABC", txn=txn) |
|
352 txn.commit() |
|
353 if v==None : |
|
354 time.sleep(0.02) |
|
355 self.assertTrue(time.time()<timeout) |
|
356 self.assertEquals("123", v) |
|
357 |
|
358 txn=self.dbenvMaster.txn_begin() |
|
359 self.dbMaster.delete("ABC", txn=txn) |
|
360 txn.commit() |
|
361 timeout=time.time()+10 |
|
362 while (time.time()<timeout) and (v!=None) : |
|
363 txn=self.dbenvClient.txn_begin() |
|
364 v=self.dbClient.get("ABC", txn=txn) |
|
365 txn.commit() |
|
366 if v==None : |
|
367 time.sleep(0.02) |
|
368 self.assertTrue(time.time()<timeout) |
|
369 self.assertEquals(None, v) |
|
370 |
|
371 if db.version() >= (4,7) : |
|
372 def test02_test_request(self) : |
|
373 self.basic_rep_threading() |
|
374 (minimum, maximum) = self.dbenvClient.rep_get_request() |
|
375 self.dbenvClient.rep_set_request(minimum-1, maximum+1) |
|
376 self.assertEqual(self.dbenvClient.rep_get_request(), |
|
377 (minimum-1, maximum+1)) |
|
378 |
|
379 if db.version() >= (4,6) : |
|
380 def test03_master_election(self) : |
|
381 # Get ready to hold an election |
|
382 #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) |
|
383 self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT) |
|
384 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) |
|
385 |
|
386 def thread_do(env, q, envid, election_status, must_be_master) : |
|
387 while True : |
|
388 v=q.get() |
|
389 if v == None : return |
|
390 r = env.rep_process_message(v[0],v[1],envid) |
|
391 if must_be_master and self.confirmed_master : |
|
392 self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER) |
|
393 must_be_master = False |
|
394 |
|
395 if r[0] == db.DB_REP_HOLDELECTION : |
|
396 def elect() : |
|
397 while True : |
|
398 try : |
|
399 env.rep_elect(2, 1) |
|
400 election_status[0] = False |
|
401 break |
|
402 except db.DBRepUnavailError : |
|
403 pass |
|
404 if not election_status[0] and not self.confirmed_master : |
|
405 from threading import Thread |
|
406 election_status[0] = True |
|
407 t=Thread(target=elect) |
|
408 import sys |
|
409 if sys.version_info[0] < 3 : |
|
410 t.setDaemon(True) |
|
411 else : |
|
412 t.daemon = True |
|
413 t.start() |
|
414 |
|
415 self.thread_do = thread_do |
|
416 |
|
417 self.t_m.start() |
|
418 self.t_c.start() |
|
419 |
|
420 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) |
|
421 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) |
|
422 self.client_doing_election[0] = True |
|
423 while True : |
|
424 try : |
|
425 self.dbenvClient.rep_elect(2, 1) |
|
426 self.client_doing_election[0] = False |
|
427 break |
|
428 except db.DBRepUnavailError : |
|
429 pass |
|
430 |
|
431 self.assertTrue(self.confirmed_master) |
|
432 |
|
433 #---------------------------------------------------------------------- |
|
434 |
|
435 def test_suite(): |
|
436 suite = unittest.TestSuite() |
|
437 if db.version() >= (4, 6) : |
|
438 dbenv = db.DBEnv() |
|
439 try : |
|
440 dbenv.repmgr_get_ack_policy() |
|
441 ReplicationManager_available=True |
|
442 except : |
|
443 ReplicationManager_available=False |
|
444 dbenv.close() |
|
445 del dbenv |
|
446 if ReplicationManager_available : |
|
447 suite.addTest(unittest.makeSuite(DBReplicationManager)) |
|
448 |
|
449 if have_threads : |
|
450 suite.addTest(unittest.makeSuite(DBBaseReplication)) |
|
451 |
|
452 return suite |
|
453 |
|
454 |
|
455 if __name__ == '__main__': |
|
456 unittest.main(defaultTest='test_suite') |