|
1 """ |
|
2 TestCases for python DB Btree key comparison function. |
|
3 """ |
|
4 |
|
5 import sys, os, re |
|
6 import test_all |
|
7 from cStringIO import StringIO |
|
8 |
|
9 import unittest |
|
10 |
|
11 from test_all import db, dbshelve, test_support, \ |
|
12 get_new_environment_path, get_new_database_path |
|
13 |
|
14 |
|
15 lexical_cmp = cmp |
|
16 |
|
17 def lowercase_cmp(left, right): |
|
18 return cmp (left.lower(), right.lower()) |
|
19 |
|
20 def make_reverse_comparator (cmp): |
|
21 def reverse (left, right, delegate=cmp): |
|
22 return - delegate (left, right) |
|
23 return reverse |
|
24 |
|
25 _expected_lexical_test_data = ['', 'CCCP', 'a', 'aaa', 'b', 'c', 'cccce', 'ccccf'] |
|
26 _expected_lowercase_test_data = ['', 'a', 'aaa', 'b', 'c', 'CC', 'cccce', 'ccccf', 'CCCP'] |
|
27 |
|
28 class ComparatorTests (unittest.TestCase): |
|
29 def comparator_test_helper (self, comparator, expected_data): |
|
30 data = expected_data[:] |
|
31 |
|
32 import sys |
|
33 if sys.version_info[0] < 3 : |
|
34 if sys.version_info[:3] < (2, 4, 0): |
|
35 data.sort(comparator) |
|
36 else : |
|
37 data.sort(cmp=comparator) |
|
38 else : # Insertion Sort. Please, improve |
|
39 data2 = [] |
|
40 for i in data : |
|
41 for j, k in enumerate(data2) : |
|
42 r = comparator(k, i) |
|
43 if r == 1 : |
|
44 data2.insert(j, i) |
|
45 break |
|
46 else : |
|
47 data2.append(i) |
|
48 data = data2 |
|
49 |
|
50 self.failUnless (data == expected_data, |
|
51 "comparator `%s' is not right: %s vs. %s" |
|
52 % (comparator, expected_data, data)) |
|
53 def test_lexical_comparator (self): |
|
54 self.comparator_test_helper (lexical_cmp, _expected_lexical_test_data) |
|
55 def test_reverse_lexical_comparator (self): |
|
56 rev = _expected_lexical_test_data[:] |
|
57 rev.reverse () |
|
58 self.comparator_test_helper (make_reverse_comparator (lexical_cmp), |
|
59 rev) |
|
60 def test_lowercase_comparator (self): |
|
61 self.comparator_test_helper (lowercase_cmp, |
|
62 _expected_lowercase_test_data) |
|
63 |
|
64 class AbstractBtreeKeyCompareTestCase (unittest.TestCase): |
|
65 env = None |
|
66 db = None |
|
67 |
|
68 def setUp (self): |
|
69 self.filename = self.__class__.__name__ + '.db' |
|
70 self.homeDir = get_new_environment_path() |
|
71 env = db.DBEnv() |
|
72 env.open (self.homeDir, |
|
73 db.DB_CREATE | db.DB_INIT_MPOOL |
|
74 | db.DB_INIT_LOCK | db.DB_THREAD) |
|
75 self.env = env |
|
76 |
|
77 def tearDown (self): |
|
78 self.closeDB() |
|
79 if self.env is not None: |
|
80 self.env.close() |
|
81 self.env = None |
|
82 test_support.rmtree(self.homeDir) |
|
83 |
|
84 def addDataToDB (self, data): |
|
85 i = 0 |
|
86 for item in data: |
|
87 self.db.put (item, str (i)) |
|
88 i = i + 1 |
|
89 |
|
90 def createDB (self, key_comparator): |
|
91 self.db = db.DB (self.env) |
|
92 self.setupDB (key_comparator) |
|
93 self.db.open (self.filename, "test", db.DB_BTREE, db.DB_CREATE) |
|
94 |
|
95 def setupDB (self, key_comparator): |
|
96 self.db.set_bt_compare (key_comparator) |
|
97 |
|
98 def closeDB (self): |
|
99 if self.db is not None: |
|
100 self.db.close () |
|
101 self.db = None |
|
102 |
|
103 def startTest (self): |
|
104 pass |
|
105 |
|
106 def finishTest (self, expected = None): |
|
107 if expected is not None: |
|
108 self.check_results (expected) |
|
109 self.closeDB () |
|
110 |
|
111 def check_results (self, expected): |
|
112 curs = self.db.cursor () |
|
113 try: |
|
114 index = 0 |
|
115 rec = curs.first () |
|
116 while rec: |
|
117 key, ignore = rec |
|
118 self.failUnless (index < len (expected), |
|
119 "to many values returned from cursor") |
|
120 self.failUnless (expected[index] == key, |
|
121 "expected value `%s' at %d but got `%s'" |
|
122 % (expected[index], index, key)) |
|
123 index = index + 1 |
|
124 rec = curs.next () |
|
125 self.failUnless (index == len (expected), |
|
126 "not enough values returned from cursor") |
|
127 finally: |
|
128 curs.close () |
|
129 |
|
130 class BtreeKeyCompareTestCase (AbstractBtreeKeyCompareTestCase): |
|
131 def runCompareTest (self, comparator, data): |
|
132 self.startTest () |
|
133 self.createDB (comparator) |
|
134 self.addDataToDB (data) |
|
135 self.finishTest (data) |
|
136 |
|
137 def test_lexical_ordering (self): |
|
138 self.runCompareTest (lexical_cmp, _expected_lexical_test_data) |
|
139 |
|
140 def test_reverse_lexical_ordering (self): |
|
141 expected_rev_data = _expected_lexical_test_data[:] |
|
142 expected_rev_data.reverse () |
|
143 self.runCompareTest (make_reverse_comparator (lexical_cmp), |
|
144 expected_rev_data) |
|
145 |
|
146 def test_compare_function_useless (self): |
|
147 self.startTest () |
|
148 def socialist_comparator (l, r): |
|
149 return 0 |
|
150 self.createDB (socialist_comparator) |
|
151 self.addDataToDB (['b', 'a', 'd']) |
|
152 # all things being equal the first key will be the only key |
|
153 # in the database... (with the last key's value fwiw) |
|
154 self.finishTest (['b']) |
|
155 |
|
156 |
|
157 class BtreeExceptionsTestCase (AbstractBtreeKeyCompareTestCase): |
|
158 def test_raises_non_callable (self): |
|
159 self.startTest () |
|
160 self.assertRaises (TypeError, self.createDB, 'abc') |
|
161 self.assertRaises (TypeError, self.createDB, None) |
|
162 self.finishTest () |
|
163 |
|
164 def test_set_bt_compare_with_function (self): |
|
165 self.startTest () |
|
166 self.createDB (lexical_cmp) |
|
167 self.finishTest () |
|
168 |
|
169 def check_results (self, results): |
|
170 pass |
|
171 |
|
172 def test_compare_function_incorrect (self): |
|
173 self.startTest () |
|
174 def bad_comparator (l, r): |
|
175 return 1 |
|
176 # verify that set_bt_compare checks that comparator('', '') == 0 |
|
177 self.assertRaises (TypeError, self.createDB, bad_comparator) |
|
178 self.finishTest () |
|
179 |
|
180 def verifyStderr(self, method, successRe): |
|
181 """ |
|
182 Call method() while capturing sys.stderr output internally and |
|
183 call self.fail() if successRe.search() does not match the stderr |
|
184 output. This is used to test for uncatchable exceptions. |
|
185 """ |
|
186 stdErr = sys.stderr |
|
187 sys.stderr = StringIO() |
|
188 try: |
|
189 method() |
|
190 finally: |
|
191 temp = sys.stderr |
|
192 sys.stderr = stdErr |
|
193 errorOut = temp.getvalue() |
|
194 if not successRe.search(errorOut): |
|
195 self.fail("unexpected stderr output:\n"+errorOut) |
|
196 |
|
197 def _test_compare_function_exception (self): |
|
198 self.startTest () |
|
199 def bad_comparator (l, r): |
|
200 if l == r: |
|
201 # pass the set_bt_compare test |
|
202 return 0 |
|
203 raise RuntimeError, "i'm a naughty comparison function" |
|
204 self.createDB (bad_comparator) |
|
205 #print "\n*** test should print 2 uncatchable tracebacks ***" |
|
206 self.addDataToDB (['a', 'b', 'c']) # this should raise, but... |
|
207 self.finishTest () |
|
208 |
|
209 def test_compare_function_exception(self): |
|
210 self.verifyStderr( |
|
211 self._test_compare_function_exception, |
|
212 re.compile('(^RuntimeError:.* naughty.*){2}', re.M|re.S) |
|
213 ) |
|
214 |
|
215 def _test_compare_function_bad_return (self): |
|
216 self.startTest () |
|
217 def bad_comparator (l, r): |
|
218 if l == r: |
|
219 # pass the set_bt_compare test |
|
220 return 0 |
|
221 return l |
|
222 self.createDB (bad_comparator) |
|
223 #print "\n*** test should print 2 errors about returning an int ***" |
|
224 self.addDataToDB (['a', 'b', 'c']) # this should raise, but... |
|
225 self.finishTest () |
|
226 |
|
227 def test_compare_function_bad_return(self): |
|
228 self.verifyStderr( |
|
229 self._test_compare_function_bad_return, |
|
230 re.compile('(^TypeError:.* return an int.*){2}', re.M|re.S) |
|
231 ) |
|
232 |
|
233 |
|
234 def test_cannot_assign_twice (self): |
|
235 |
|
236 def my_compare (a, b): |
|
237 return 0 |
|
238 |
|
239 self.startTest () |
|
240 self.createDB (my_compare) |
|
241 try: |
|
242 self.db.set_bt_compare (my_compare) |
|
243 self.assert_(0, "this set should fail") |
|
244 |
|
245 except RuntimeError, msg: |
|
246 pass |
|
247 |
|
248 def test_suite (): |
|
249 res = unittest.TestSuite () |
|
250 |
|
251 res.addTest (unittest.makeSuite (ComparatorTests)) |
|
252 res.addTest (unittest.makeSuite (BtreeExceptionsTestCase)) |
|
253 res.addTest (unittest.makeSuite (BtreeKeyCompareTestCase)) |
|
254 return res |
|
255 |
|
256 if __name__ == '__main__': |
|
257 unittest.main (defaultTest = 'suite') |