|
1 import unittest |
|
2 import os |
|
3 |
|
4 from test_all import db, test_support, get_new_environment_path, get_new_database_path |
|
5 |
|
6 |
|
7 class DBSequenceTest(unittest.TestCase): |
|
8 import sys |
|
9 if sys.version_info[:3] < (2, 4, 0): |
|
10 def assertTrue(self, expr, msg=None): |
|
11 self.failUnless(expr,msg=msg) |
|
12 |
|
13 def setUp(self): |
|
14 self.int_32_max = 0x100000000 |
|
15 self.homeDir = get_new_environment_path() |
|
16 self.filename = "test" |
|
17 |
|
18 self.dbenv = db.DBEnv() |
|
19 self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666) |
|
20 self.d = db.DB(self.dbenv) |
|
21 self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666) |
|
22 |
|
23 def tearDown(self): |
|
24 if hasattr(self, 'seq'): |
|
25 self.seq.close() |
|
26 del self.seq |
|
27 if hasattr(self, 'd'): |
|
28 self.d.close() |
|
29 del self.d |
|
30 if hasattr(self, 'dbenv'): |
|
31 self.dbenv.close() |
|
32 del self.dbenv |
|
33 |
|
34 test_support.rmtree(self.homeDir) |
|
35 |
|
36 def test_get(self): |
|
37 self.seq = db.DBSequence(self.d, flags=0) |
|
38 start_value = 10 * self.int_32_max |
|
39 self.assertEqual(0xA00000000, start_value) |
|
40 self.assertEquals(None, self.seq.init_value(start_value)) |
|
41 self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE)) |
|
42 self.assertEquals(start_value, self.seq.get(5)) |
|
43 self.assertEquals(start_value + 5, self.seq.get()) |
|
44 |
|
45 def test_remove(self): |
|
46 self.seq = db.DBSequence(self.d, flags=0) |
|
47 self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|
48 self.assertEquals(None, self.seq.remove(txn=None, flags=0)) |
|
49 del self.seq |
|
50 |
|
51 def test_get_key(self): |
|
52 self.seq = db.DBSequence(self.d, flags=0) |
|
53 key = 'foo' |
|
54 self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE)) |
|
55 self.assertEquals(key, self.seq.get_key()) |
|
56 |
|
57 def test_get_dbp(self): |
|
58 self.seq = db.DBSequence(self.d, flags=0) |
|
59 self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|
60 self.assertEquals(self.d, self.seq.get_dbp()) |
|
61 |
|
62 def test_cachesize(self): |
|
63 self.seq = db.DBSequence(self.d, flags=0) |
|
64 cashe_size = 10 |
|
65 self.assertEquals(None, self.seq.set_cachesize(cashe_size)) |
|
66 self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|
67 self.assertEquals(cashe_size, self.seq.get_cachesize()) |
|
68 |
|
69 def test_flags(self): |
|
70 self.seq = db.DBSequence(self.d, flags=0) |
|
71 flag = db.DB_SEQ_WRAP; |
|
72 self.assertEquals(None, self.seq.set_flags(flag)) |
|
73 self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|
74 self.assertEquals(flag, self.seq.get_flags() & flag) |
|
75 |
|
76 def test_range(self): |
|
77 self.seq = db.DBSequence(self.d, flags=0) |
|
78 seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1) |
|
79 self.assertEquals(None, self.seq.set_range(seq_range)) |
|
80 self.seq.init_value(seq_range[0]) |
|
81 self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|
82 self.assertEquals(seq_range, self.seq.get_range()) |
|
83 |
|
84 def test_stat(self): |
|
85 self.seq = db.DBSequence(self.d, flags=0) |
|
86 self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|
87 stat = self.seq.stat() |
|
88 for param in ('nowait', 'min', 'max', 'value', 'current', |
|
89 'flags', 'cache_size', 'last_value', 'wait'): |
|
90 self.assertTrue(param in stat, "parameter %s isn't in stat info" % param) |
|
91 |
|
92 if db.version() >= (4,7) : |
|
93 # This code checks a crash solved in Berkeley DB 4.7 |
|
94 def test_stat_crash(self) : |
|
95 d=db.DB() |
|
96 d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE) # In RAM |
|
97 seq = db.DBSequence(d, flags=0) |
|
98 |
|
99 self.assertRaises(db.DBNotFoundError, seq.open, |
|
100 key='id', txn=None, flags=0) |
|
101 |
|
102 self.assertRaises(db.DBInvalidArgError, seq.stat) |
|
103 |
|
104 d.close() |
|
105 |
|
106 def test_64bits(self) : |
|
107 # We don't use both extremes because they are problematic |
|
108 value_plus=(1L<<63)-2 |
|
109 self.assertEquals(9223372036854775806L,value_plus) |
|
110 value_minus=(-1L<<63)+1 # Two complement |
|
111 self.assertEquals(-9223372036854775807L,value_minus) |
|
112 self.seq = db.DBSequence(self.d, flags=0) |
|
113 self.assertEquals(None, self.seq.init_value(value_plus-1)) |
|
114 self.assertEquals(None, self.seq.open(key='id', txn=None, |
|
115 flags=db.DB_CREATE)) |
|
116 self.assertEquals(value_plus-1, self.seq.get(1)) |
|
117 self.assertEquals(value_plus, self.seq.get(1)) |
|
118 |
|
119 self.seq.remove(txn=None, flags=0) |
|
120 |
|
121 self.seq = db.DBSequence(self.d, flags=0) |
|
122 self.assertEquals(None, self.seq.init_value(value_minus)) |
|
123 self.assertEquals(None, self.seq.open(key='id', txn=None, |
|
124 flags=db.DB_CREATE)) |
|
125 self.assertEquals(value_minus, self.seq.get(1)) |
|
126 self.assertEquals(value_minus+1, self.seq.get(1)) |
|
127 |
|
128 def test_multiple_close(self): |
|
129 self.seq = db.DBSequence(self.d) |
|
130 self.seq.close() # You can close a Sequence multiple times |
|
131 self.seq.close() |
|
132 self.seq.close() |
|
133 |
|
134 def test_suite(): |
|
135 suite = unittest.TestSuite() |
|
136 if db.version() >= (4,3): |
|
137 suite.addTest(unittest.makeSuite(DBSequenceTest)) |
|
138 return suite |
|
139 |
|
140 |
|
141 if __name__ == '__main__': |
|
142 unittest.main(defaultTest='test_suite') |