|
1 """TestCases for multi-threaded access to a DB. |
|
2 """ |
|
3 |
|
4 import os |
|
5 import sys |
|
6 import time |
|
7 import errno |
|
8 from random import random |
|
9 |
|
10 DASH = '-' |
|
11 |
|
12 try: |
|
13 WindowsError |
|
14 except NameError: |
|
15 class WindowsError(Exception): |
|
16 pass |
|
17 |
|
18 import unittest |
|
19 from test_all import db, dbutils, test_support, verbose, have_threads, \ |
|
20 get_new_environment_path, get_new_database_path |
|
21 |
|
22 if have_threads : |
|
23 from threading import Thread |
|
24 import sys |
|
25 if sys.version_info[0] < 3 : |
|
26 from threading import currentThread |
|
27 else : |
|
28 from threading import current_thread as currentThread |
|
29 |
|
30 |
|
31 #---------------------------------------------------------------------- |
|
32 |
|
33 class BaseThreadedTestCase(unittest.TestCase): |
|
34 dbtype = db.DB_UNKNOWN # must be set in derived class |
|
35 dbopenflags = 0 |
|
36 dbsetflags = 0 |
|
37 envflags = 0 |
|
38 |
|
39 import sys |
|
40 if sys.version_info[:3] < (2, 4, 0): |
|
41 def assertTrue(self, expr, msg=None): |
|
42 self.failUnless(expr,msg=msg) |
|
43 |
|
44 def setUp(self): |
|
45 if verbose: |
|
46 dbutils._deadlock_VerboseFile = sys.stdout |
|
47 |
|
48 self.homeDir = get_new_environment_path() |
|
49 self.env = db.DBEnv() |
|
50 self.setEnvOpts() |
|
51 self.env.open(self.homeDir, self.envflags | db.DB_CREATE) |
|
52 |
|
53 self.filename = self.__class__.__name__ + '.db' |
|
54 self.d = db.DB(self.env) |
|
55 if self.dbsetflags: |
|
56 self.d.set_flags(self.dbsetflags) |
|
57 self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE) |
|
58 |
|
59 def tearDown(self): |
|
60 self.d.close() |
|
61 self.env.close() |
|
62 test_support.rmtree(self.homeDir) |
|
63 |
|
64 def setEnvOpts(self): |
|
65 pass |
|
66 |
|
67 def makeData(self, key): |
|
68 return DASH.join([key] * 5) |
|
69 |
|
70 |
|
71 #---------------------------------------------------------------------- |
|
72 |
|
73 |
|
74 class ConcurrentDataStoreBase(BaseThreadedTestCase): |
|
75 dbopenflags = db.DB_THREAD |
|
76 envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL |
|
77 readers = 0 # derived class should set |
|
78 writers = 0 |
|
79 records = 1000 |
|
80 |
|
81 def test01_1WriterMultiReaders(self): |
|
82 if verbose: |
|
83 print '\n', '-=' * 30 |
|
84 print "Running %s.test01_1WriterMultiReaders..." % \ |
|
85 self.__class__.__name__ |
|
86 |
|
87 keys=range(self.records) |
|
88 import random |
|
89 random.shuffle(keys) |
|
90 records_per_writer=self.records//self.writers |
|
91 readers_per_writer=self.readers//self.writers |
|
92 self.assertEqual(self.records,self.writers*records_per_writer) |
|
93 self.assertEqual(self.readers,self.writers*readers_per_writer) |
|
94 self.assertTrue((records_per_writer%readers_per_writer)==0) |
|
95 readers = [] |
|
96 |
|
97 for x in xrange(self.readers): |
|
98 rt = Thread(target = self.readerThread, |
|
99 args = (self.d, x), |
|
100 name = 'reader %d' % x, |
|
101 )#verbose = verbose) |
|
102 import sys |
|
103 if sys.version_info[0] < 3 : |
|
104 rt.setDaemon(True) |
|
105 else : |
|
106 rt.daemon = True |
|
107 readers.append(rt) |
|
108 |
|
109 writers=[] |
|
110 for x in xrange(self.writers): |
|
111 a=keys[records_per_writer*x:records_per_writer*(x+1)] |
|
112 a.sort() # Generate conflicts |
|
113 b=readers[readers_per_writer*x:readers_per_writer*(x+1)] |
|
114 wt = Thread(target = self.writerThread, |
|
115 args = (self.d, a, b), |
|
116 name = 'writer %d' % x, |
|
117 )#verbose = verbose) |
|
118 writers.append(wt) |
|
119 |
|
120 for t in writers: |
|
121 import sys |
|
122 if sys.version_info[0] < 3 : |
|
123 t.setDaemon(True) |
|
124 else : |
|
125 t.daemon = True |
|
126 t.start() |
|
127 |
|
128 for t in writers: |
|
129 t.join() |
|
130 for t in readers: |
|
131 t.join() |
|
132 |
|
133 def writerThread(self, d, keys, readers): |
|
134 import sys |
|
135 if sys.version_info[0] < 3 : |
|
136 name = currentThread().getName() |
|
137 else : |
|
138 name = currentThread().name |
|
139 |
|
140 if verbose: |
|
141 print "%s: creating records %d - %d" % (name, start, stop) |
|
142 |
|
143 count=len(keys)//len(readers) |
|
144 count2=count |
|
145 for x in keys : |
|
146 key = '%04d' % x |
|
147 dbutils.DeadlockWrap(d.put, key, self.makeData(key), |
|
148 max_retries=12) |
|
149 if verbose and x % 100 == 0: |
|
150 print "%s: records %d - %d finished" % (name, start, x) |
|
151 |
|
152 count2-=1 |
|
153 if not count2 : |
|
154 readers.pop().start() |
|
155 count2=count |
|
156 |
|
157 if verbose: |
|
158 print "%s: finished creating records" % name |
|
159 |
|
160 if verbose: |
|
161 print "%s: thread finished" % name |
|
162 |
|
163 def readerThread(self, d, readerNum): |
|
164 import sys |
|
165 if sys.version_info[0] < 3 : |
|
166 name = currentThread().getName() |
|
167 else : |
|
168 name = currentThread().name |
|
169 |
|
170 for i in xrange(5) : |
|
171 c = d.cursor() |
|
172 count = 0 |
|
173 rec = c.first() |
|
174 while rec: |
|
175 count += 1 |
|
176 key, data = rec |
|
177 self.assertEqual(self.makeData(key), data) |
|
178 rec = c.next() |
|
179 if verbose: |
|
180 print "%s: found %d records" % (name, count) |
|
181 c.close() |
|
182 |
|
183 if verbose: |
|
184 print "%s: thread finished" % name |
|
185 |
|
186 |
|
187 class BTreeConcurrentDataStore(ConcurrentDataStoreBase): |
|
188 dbtype = db.DB_BTREE |
|
189 writers = 2 |
|
190 readers = 10 |
|
191 records = 1000 |
|
192 |
|
193 |
|
194 class HashConcurrentDataStore(ConcurrentDataStoreBase): |
|
195 dbtype = db.DB_HASH |
|
196 writers = 2 |
|
197 readers = 10 |
|
198 records = 1000 |
|
199 |
|
200 |
|
201 #---------------------------------------------------------------------- |
|
202 |
|
203 class SimpleThreadedBase(BaseThreadedTestCase): |
|
204 dbopenflags = db.DB_THREAD |
|
205 envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
|
206 readers = 10 |
|
207 writers = 2 |
|
208 records = 1000 |
|
209 |
|
210 def setEnvOpts(self): |
|
211 self.env.set_lk_detect(db.DB_LOCK_DEFAULT) |
|
212 |
|
213 def test02_SimpleLocks(self): |
|
214 if verbose: |
|
215 print '\n', '-=' * 30 |
|
216 print "Running %s.test02_SimpleLocks..." % self.__class__.__name__ |
|
217 |
|
218 |
|
219 keys=range(self.records) |
|
220 import random |
|
221 random.shuffle(keys) |
|
222 records_per_writer=self.records//self.writers |
|
223 readers_per_writer=self.readers//self.writers |
|
224 self.assertEqual(self.records,self.writers*records_per_writer) |
|
225 self.assertEqual(self.readers,self.writers*readers_per_writer) |
|
226 self.assertTrue((records_per_writer%readers_per_writer)==0) |
|
227 |
|
228 readers = [] |
|
229 for x in xrange(self.readers): |
|
230 rt = Thread(target = self.readerThread, |
|
231 args = (self.d, x), |
|
232 name = 'reader %d' % x, |
|
233 )#verbose = verbose) |
|
234 import sys |
|
235 if sys.version_info[0] < 3 : |
|
236 rt.setDaemon(True) |
|
237 else : |
|
238 rt.daemon = True |
|
239 readers.append(rt) |
|
240 |
|
241 writers = [] |
|
242 for x in xrange(self.writers): |
|
243 a=keys[records_per_writer*x:records_per_writer*(x+1)] |
|
244 a.sort() # Generate conflicts |
|
245 b=readers[readers_per_writer*x:readers_per_writer*(x+1)] |
|
246 wt = Thread(target = self.writerThread, |
|
247 args = (self.d, a, b), |
|
248 name = 'writer %d' % x, |
|
249 )#verbose = verbose) |
|
250 writers.append(wt) |
|
251 |
|
252 for t in writers: |
|
253 import sys |
|
254 if sys.version_info[0] < 3 : |
|
255 t.setDaemon(True) |
|
256 else : |
|
257 t.daemon = True |
|
258 t.start() |
|
259 |
|
260 for t in writers: |
|
261 t.join() |
|
262 for t in readers: |
|
263 t.join() |
|
264 |
|
265 def writerThread(self, d, keys, readers): |
|
266 import sys |
|
267 if sys.version_info[0] < 3 : |
|
268 name = currentThread().getName() |
|
269 else : |
|
270 name = currentThread().name |
|
271 if verbose: |
|
272 print "%s: creating records %d - %d" % (name, start, stop) |
|
273 |
|
274 count=len(keys)//len(readers) |
|
275 count2=count |
|
276 for x in keys : |
|
277 key = '%04d' % x |
|
278 dbutils.DeadlockWrap(d.put, key, self.makeData(key), |
|
279 max_retries=12) |
|
280 |
|
281 if verbose and x % 100 == 0: |
|
282 print "%s: records %d - %d finished" % (name, start, x) |
|
283 |
|
284 count2-=1 |
|
285 if not count2 : |
|
286 readers.pop().start() |
|
287 count2=count |
|
288 |
|
289 if verbose: |
|
290 print "%s: thread finished" % name |
|
291 |
|
292 def readerThread(self, d, readerNum): |
|
293 import sys |
|
294 if sys.version_info[0] < 3 : |
|
295 name = currentThread().getName() |
|
296 else : |
|
297 name = currentThread().name |
|
298 |
|
299 c = d.cursor() |
|
300 count = 0 |
|
301 rec = dbutils.DeadlockWrap(c.first, max_retries=10) |
|
302 while rec: |
|
303 count += 1 |
|
304 key, data = rec |
|
305 self.assertEqual(self.makeData(key), data) |
|
306 rec = dbutils.DeadlockWrap(c.next, max_retries=10) |
|
307 if verbose: |
|
308 print "%s: found %d records" % (name, count) |
|
309 c.close() |
|
310 |
|
311 if verbose: |
|
312 print "%s: thread finished" % name |
|
313 |
|
314 |
|
315 class BTreeSimpleThreaded(SimpleThreadedBase): |
|
316 dbtype = db.DB_BTREE |
|
317 |
|
318 |
|
319 class HashSimpleThreaded(SimpleThreadedBase): |
|
320 dbtype = db.DB_HASH |
|
321 |
|
322 |
|
323 #---------------------------------------------------------------------- |
|
324 |
|
325 |
|
326 class ThreadedTransactionsBase(BaseThreadedTestCase): |
|
327 dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT |
|
328 envflags = (db.DB_THREAD | |
|
329 db.DB_INIT_MPOOL | |
|
330 db.DB_INIT_LOCK | |
|
331 db.DB_INIT_LOG | |
|
332 db.DB_INIT_TXN |
|
333 ) |
|
334 readers = 0 |
|
335 writers = 0 |
|
336 records = 2000 |
|
337 txnFlag = 0 |
|
338 |
|
339 def setEnvOpts(self): |
|
340 #self.env.set_lk_detect(db.DB_LOCK_DEFAULT) |
|
341 pass |
|
342 |
|
343 def test03_ThreadedTransactions(self): |
|
344 if verbose: |
|
345 print '\n', '-=' * 30 |
|
346 print "Running %s.test03_ThreadedTransactions..." % \ |
|
347 self.__class__.__name__ |
|
348 |
|
349 keys=range(self.records) |
|
350 import random |
|
351 random.shuffle(keys) |
|
352 records_per_writer=self.records//self.writers |
|
353 readers_per_writer=self.readers//self.writers |
|
354 self.assertEqual(self.records,self.writers*records_per_writer) |
|
355 self.assertEqual(self.readers,self.writers*readers_per_writer) |
|
356 self.assertTrue((records_per_writer%readers_per_writer)==0) |
|
357 |
|
358 readers=[] |
|
359 for x in xrange(self.readers): |
|
360 rt = Thread(target = self.readerThread, |
|
361 args = (self.d, x), |
|
362 name = 'reader %d' % x, |
|
363 )#verbose = verbose) |
|
364 import sys |
|
365 if sys.version_info[0] < 3 : |
|
366 rt.setDaemon(True) |
|
367 else : |
|
368 rt.daemon = True |
|
369 readers.append(rt) |
|
370 |
|
371 writers = [] |
|
372 for x in xrange(self.writers): |
|
373 a=keys[records_per_writer*x:records_per_writer*(x+1)] |
|
374 b=readers[readers_per_writer*x:readers_per_writer*(x+1)] |
|
375 wt = Thread(target = self.writerThread, |
|
376 args = (self.d, a, b), |
|
377 name = 'writer %d' % x, |
|
378 )#verbose = verbose) |
|
379 writers.append(wt) |
|
380 |
|
381 dt = Thread(target = self.deadlockThread) |
|
382 import sys |
|
383 if sys.version_info[0] < 3 : |
|
384 dt.setDaemon(True) |
|
385 else : |
|
386 dt.daemon = True |
|
387 dt.start() |
|
388 |
|
389 for t in writers: |
|
390 import sys |
|
391 if sys.version_info[0] < 3 : |
|
392 t.setDaemon(True) |
|
393 else : |
|
394 t.daemon = True |
|
395 t.start() |
|
396 |
|
397 for t in writers: |
|
398 t.join() |
|
399 for t in readers: |
|
400 t.join() |
|
401 |
|
402 self.doLockDetect = False |
|
403 dt.join() |
|
404 |
|
405 def writerThread(self, d, keys, readers): |
|
406 import sys |
|
407 if sys.version_info[0] < 3 : |
|
408 name = currentThread().getName() |
|
409 else : |
|
410 name = currentThread().name |
|
411 |
|
412 count=len(keys)//len(readers) |
|
413 while len(keys): |
|
414 try: |
|
415 txn = self.env.txn_begin(None, self.txnFlag) |
|
416 keys2=keys[:count] |
|
417 for x in keys2 : |
|
418 key = '%04d' % x |
|
419 d.put(key, self.makeData(key), txn) |
|
420 if verbose and x % 100 == 0: |
|
421 print "%s: records %d - %d finished" % (name, start, x) |
|
422 txn.commit() |
|
423 keys=keys[count:] |
|
424 readers.pop().start() |
|
425 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: |
|
426 if verbose: |
|
427 print "%s: Aborting transaction (%s)" % (name, val[1]) |
|
428 txn.abort() |
|
429 |
|
430 if verbose: |
|
431 print "%s: thread finished" % name |
|
432 |
|
433 def readerThread(self, d, readerNum): |
|
434 import sys |
|
435 if sys.version_info[0] < 3 : |
|
436 name = currentThread().getName() |
|
437 else : |
|
438 name = currentThread().name |
|
439 |
|
440 finished = False |
|
441 while not finished: |
|
442 try: |
|
443 txn = self.env.txn_begin(None, self.txnFlag) |
|
444 c = d.cursor(txn) |
|
445 count = 0 |
|
446 rec = c.first() |
|
447 while rec: |
|
448 count += 1 |
|
449 key, data = rec |
|
450 self.assertEqual(self.makeData(key), data) |
|
451 rec = c.next() |
|
452 if verbose: print "%s: found %d records" % (name, count) |
|
453 c.close() |
|
454 txn.commit() |
|
455 finished = True |
|
456 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: |
|
457 if verbose: |
|
458 print "%s: Aborting transaction (%s)" % (name, val[1]) |
|
459 c.close() |
|
460 txn.abort() |
|
461 |
|
462 if verbose: |
|
463 print "%s: thread finished" % name |
|
464 |
|
465 def deadlockThread(self): |
|
466 self.doLockDetect = True |
|
467 while self.doLockDetect: |
|
468 time.sleep(0.05) |
|
469 try: |
|
470 aborted = self.env.lock_detect( |
|
471 db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT) |
|
472 if verbose and aborted: |
|
473 print "deadlock: Aborted %d deadlocked transaction(s)" \ |
|
474 % aborted |
|
475 except db.DBError: |
|
476 pass |
|
477 |
|
478 |
|
479 class BTreeThreadedTransactions(ThreadedTransactionsBase): |
|
480 dbtype = db.DB_BTREE |
|
481 writers = 2 |
|
482 readers = 10 |
|
483 records = 1000 |
|
484 |
|
485 class HashThreadedTransactions(ThreadedTransactionsBase): |
|
486 dbtype = db.DB_HASH |
|
487 writers = 2 |
|
488 readers = 10 |
|
489 records = 1000 |
|
490 |
|
491 class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase): |
|
492 dbtype = db.DB_BTREE |
|
493 writers = 2 |
|
494 readers = 10 |
|
495 records = 1000 |
|
496 txnFlag = db.DB_TXN_NOWAIT |
|
497 |
|
498 class HashThreadedNoWaitTransactions(ThreadedTransactionsBase): |
|
499 dbtype = db.DB_HASH |
|
500 writers = 2 |
|
501 readers = 10 |
|
502 records = 1000 |
|
503 txnFlag = db.DB_TXN_NOWAIT |
|
504 |
|
505 |
|
506 #---------------------------------------------------------------------- |
|
507 |
|
508 def test_suite(): |
|
509 suite = unittest.TestSuite() |
|
510 |
|
511 if have_threads: |
|
512 suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore)) |
|
513 suite.addTest(unittest.makeSuite(HashConcurrentDataStore)) |
|
514 suite.addTest(unittest.makeSuite(BTreeSimpleThreaded)) |
|
515 suite.addTest(unittest.makeSuite(HashSimpleThreaded)) |
|
516 suite.addTest(unittest.makeSuite(BTreeThreadedTransactions)) |
|
517 suite.addTest(unittest.makeSuite(HashThreadedTransactions)) |
|
518 suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions)) |
|
519 suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions)) |
|
520 |
|
521 else: |
|
522 print "Threads not available, skipping thread tests." |
|
523 |
|
524 return suite |
|
525 |
|
526 |
|
527 if __name__ == '__main__': |
|
528 unittest.main(defaultTest='test_suite') |