|
1 #!/usr/bin/env python |
|
2 # |
|
3 #----------------------------------------------------------------------- |
|
4 # A test suite for the table interface built on bsddb.db |
|
5 #----------------------------------------------------------------------- |
|
6 # |
|
7 # Copyright (C) 2000, 2001 by Autonomous Zone Industries |
|
8 # Copyright (C) 2002 Gregory P. Smith |
|
9 # |
|
10 # March 20, 2000 |
|
11 # |
|
12 # License: This is free software. You may use this software for any |
|
13 # purpose including modification/redistribution, so long as |
|
14 # this header remains intact and that you do not claim any |
|
15 # rights of ownership or authorship of this software. This |
|
16 # software has been tested, but no warranty is expressed or |
|
17 # implied. |
|
18 # |
|
19 # -- Gregory P. Smith <greg@krypto.org> |
|
20 # |
|
21 # $Id: test_dbtables.py 66088 2008-08-31 14:00:51Z jesus.cea $ |
|
22 |
|
23 import os, re |
|
24 try: |
|
25 import cPickle |
|
26 pickle = cPickle |
|
27 except ImportError: |
|
28 import pickle |
|
29 |
|
30 import unittest |
|
31 from test_all import db, dbtables, test_support, verbose, \ |
|
32 get_new_environment_path, get_new_database_path |
|
33 |
|
34 #---------------------------------------------------------------------- |
|
35 |
|
36 class TableDBTestCase(unittest.TestCase): |
|
37 db_name = 'test-table.db' |
|
38 |
|
39 def setUp(self): |
|
40 import sys |
|
41 if sys.version_info[0] >= 3 : |
|
42 from test_all import do_proxy_db_py3k |
|
43 self._flag_proxy_db_py3k = do_proxy_db_py3k(False) |
|
44 |
|
45 self.testHomeDir = get_new_environment_path() |
|
46 self.tdb = dbtables.bsdTableDB( |
|
47 filename='tabletest.db', dbhome=self.testHomeDir, create=1) |
|
48 |
|
49 def tearDown(self): |
|
50 self.tdb.close() |
|
51 import sys |
|
52 if sys.version_info[0] >= 3 : |
|
53 from test_all import do_proxy_db_py3k |
|
54 do_proxy_db_py3k(self._flag_proxy_db_py3k) |
|
55 test_support.rmtree(self.testHomeDir) |
|
56 |
|
57 def test01(self): |
|
58 tabname = "test01" |
|
59 colname = 'cool numbers' |
|
60 try: |
|
61 self.tdb.Drop(tabname) |
|
62 except dbtables.TableDBError: |
|
63 pass |
|
64 self.tdb.CreateTable(tabname, [colname]) |
|
65 import sys |
|
66 if sys.version_info[0] < 3 : |
|
67 self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 1)}) |
|
68 else : |
|
69 self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, |
|
70 1).decode("iso8859-1")}) # 8 bits |
|
71 |
|
72 if verbose: |
|
73 self.tdb._db_print() |
|
74 |
|
75 values = self.tdb.Select( |
|
76 tabname, [colname], conditions={colname: None}) |
|
77 |
|
78 import sys |
|
79 if sys.version_info[0] < 3 : |
|
80 colval = pickle.loads(values[0][colname]) |
|
81 else : |
|
82 colval = pickle.loads(bytes(values[0][colname], "iso8859-1")) |
|
83 self.assert_(colval > 3.141) |
|
84 self.assert_(colval < 3.142) |
|
85 |
|
86 |
|
87 def test02(self): |
|
88 tabname = "test02" |
|
89 col0 = 'coolness factor' |
|
90 col1 = 'but can it fly?' |
|
91 col2 = 'Species' |
|
92 |
|
93 import sys |
|
94 if sys.version_info[0] < 3 : |
|
95 testinfo = [ |
|
96 {col0: pickle.dumps(8, 1), col1: 'no', col2: 'Penguin'}, |
|
97 {col0: pickle.dumps(-1, 1), col1: 'no', col2: 'Turkey'}, |
|
98 {col0: pickle.dumps(9, 1), col1: 'yes', col2: 'SR-71A Blackbird'} |
|
99 ] |
|
100 else : |
|
101 testinfo = [ |
|
102 {col0: pickle.dumps(8, 1).decode("iso8859-1"), |
|
103 col1: 'no', col2: 'Penguin'}, |
|
104 {col0: pickle.dumps(-1, 1).decode("iso8859-1"), |
|
105 col1: 'no', col2: 'Turkey'}, |
|
106 {col0: pickle.dumps(9, 1).decode("iso8859-1"), |
|
107 col1: 'yes', col2: 'SR-71A Blackbird'} |
|
108 ] |
|
109 |
|
110 try: |
|
111 self.tdb.Drop(tabname) |
|
112 except dbtables.TableDBError: |
|
113 pass |
|
114 self.tdb.CreateTable(tabname, [col0, col1, col2]) |
|
115 for row in testinfo : |
|
116 self.tdb.Insert(tabname, row) |
|
117 |
|
118 import sys |
|
119 if sys.version_info[0] < 3 : |
|
120 values = self.tdb.Select(tabname, [col2], |
|
121 conditions={col0: lambda x: pickle.loads(x) >= 8}) |
|
122 else : |
|
123 values = self.tdb.Select(tabname, [col2], |
|
124 conditions={col0: lambda x: |
|
125 pickle.loads(bytes(x, "iso8859-1")) >= 8}) |
|
126 |
|
127 self.assertEqual(len(values), 2) |
|
128 if values[0]['Species'] == 'Penguin' : |
|
129 self.assertEqual(values[1]['Species'], 'SR-71A Blackbird') |
|
130 elif values[0]['Species'] == 'SR-71A Blackbird' : |
|
131 self.assertEqual(values[1]['Species'], 'Penguin') |
|
132 else : |
|
133 if verbose: |
|
134 print "values= %r" % (values,) |
|
135 raise RuntimeError("Wrong values returned!") |
|
136 |
|
137 def test03(self): |
|
138 tabname = "test03" |
|
139 try: |
|
140 self.tdb.Drop(tabname) |
|
141 except dbtables.TableDBError: |
|
142 pass |
|
143 if verbose: |
|
144 print '...before CreateTable...' |
|
145 self.tdb._db_print() |
|
146 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e']) |
|
147 if verbose: |
|
148 print '...after CreateTable...' |
|
149 self.tdb._db_print() |
|
150 self.tdb.Drop(tabname) |
|
151 if verbose: |
|
152 print '...after Drop...' |
|
153 self.tdb._db_print() |
|
154 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e']) |
|
155 |
|
156 try: |
|
157 self.tdb.Insert(tabname, |
|
158 {'a': "", |
|
159 'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1), |
|
160 'f': "Zero"}) |
|
161 self.fail('Expected an exception') |
|
162 except dbtables.TableDBError: |
|
163 pass |
|
164 |
|
165 try: |
|
166 self.tdb.Select(tabname, [], conditions={'foo': '123'}) |
|
167 self.fail('Expected an exception') |
|
168 except dbtables.TableDBError: |
|
169 pass |
|
170 |
|
171 self.tdb.Insert(tabname, |
|
172 {'a': '42', |
|
173 'b': "bad", |
|
174 'c': "meep", |
|
175 'e': 'Fuzzy wuzzy was a bear'}) |
|
176 self.tdb.Insert(tabname, |
|
177 {'a': '581750', |
|
178 'b': "good", |
|
179 'd': "bla", |
|
180 'c': "black", |
|
181 'e': 'fuzzy was here'}) |
|
182 self.tdb.Insert(tabname, |
|
183 {'a': '800000', |
|
184 'b': "good", |
|
185 'd': "bla", |
|
186 'c': "black", |
|
187 'e': 'Fuzzy wuzzy is a bear'}) |
|
188 |
|
189 if verbose: |
|
190 self.tdb._db_print() |
|
191 |
|
192 # this should return two rows |
|
193 values = self.tdb.Select(tabname, ['b', 'a', 'd'], |
|
194 conditions={'e': re.compile('wuzzy').search, |
|
195 'a': re.compile('^[0-9]+$').match}) |
|
196 self.assertEqual(len(values), 2) |
|
197 |
|
198 # now lets delete one of them and try again |
|
199 self.tdb.Delete(tabname, conditions={'b': dbtables.ExactCond('good')}) |
|
200 values = self.tdb.Select( |
|
201 tabname, ['a', 'd', 'b'], |
|
202 conditions={'e': dbtables.PrefixCond('Fuzzy')}) |
|
203 self.assertEqual(len(values), 1) |
|
204 self.assertEqual(values[0]['d'], None) |
|
205 |
|
206 values = self.tdb.Select(tabname, ['b'], |
|
207 conditions={'c': lambda c: c == 'meep'}) |
|
208 self.assertEqual(len(values), 1) |
|
209 self.assertEqual(values[0]['b'], "bad") |
|
210 |
|
211 |
|
212 def test04_MultiCondSelect(self): |
|
213 tabname = "test04_MultiCondSelect" |
|
214 try: |
|
215 self.tdb.Drop(tabname) |
|
216 except dbtables.TableDBError: |
|
217 pass |
|
218 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e']) |
|
219 |
|
220 try: |
|
221 self.tdb.Insert(tabname, |
|
222 {'a': "", |
|
223 'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1), |
|
224 'f': "Zero"}) |
|
225 self.fail('Expected an exception') |
|
226 except dbtables.TableDBError: |
|
227 pass |
|
228 |
|
229 self.tdb.Insert(tabname, {'a': "A", 'b': "B", 'c': "C", 'd': "D", |
|
230 'e': "E"}) |
|
231 self.tdb.Insert(tabname, {'a': "-A", 'b': "-B", 'c': "-C", 'd': "-D", |
|
232 'e': "-E"}) |
|
233 self.tdb.Insert(tabname, {'a': "A-", 'b': "B-", 'c': "C-", 'd': "D-", |
|
234 'e': "E-"}) |
|
235 |
|
236 if verbose: |
|
237 self.tdb._db_print() |
|
238 |
|
239 # This select should return 0 rows. it is designed to test |
|
240 # the bug identified and fixed in sourceforge bug # 590449 |
|
241 # (Big Thanks to "Rob Tillotson (n9mtb)" for tracking this down |
|
242 # and supplying a fix!! This one caused many headaches to say |
|
243 # the least...) |
|
244 values = self.tdb.Select(tabname, ['b', 'a', 'd'], |
|
245 conditions={'e': dbtables.ExactCond('E'), |
|
246 'a': dbtables.ExactCond('A'), |
|
247 'd': dbtables.PrefixCond('-') |
|
248 } ) |
|
249 self.assertEqual(len(values), 0, values) |
|
250 |
|
251 |
|
252 def test_CreateOrExtend(self): |
|
253 tabname = "test_CreateOrExtend" |
|
254 |
|
255 self.tdb.CreateOrExtendTable( |
|
256 tabname, ['name', 'taste', 'filling', 'alcohol content', 'price']) |
|
257 try: |
|
258 self.tdb.Insert(tabname, |
|
259 {'taste': 'crap', |
|
260 'filling': 'no', |
|
261 'is it Guinness?': 'no'}) |
|
262 self.fail("Insert should've failed due to bad column name") |
|
263 except: |
|
264 pass |
|
265 self.tdb.CreateOrExtendTable(tabname, |
|
266 ['name', 'taste', 'is it Guinness?']) |
|
267 |
|
268 # these should both succeed as the table should contain the union of both sets of columns. |
|
269 self.tdb.Insert(tabname, {'taste': 'crap', 'filling': 'no', |
|
270 'is it Guinness?': 'no'}) |
|
271 self.tdb.Insert(tabname, {'taste': 'great', 'filling': 'yes', |
|
272 'is it Guinness?': 'yes', |
|
273 'name': 'Guinness'}) |
|
274 |
|
275 |
|
276 def test_CondObjs(self): |
|
277 tabname = "test_CondObjs" |
|
278 |
|
279 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e', 'p']) |
|
280 |
|
281 self.tdb.Insert(tabname, {'a': "the letter A", |
|
282 'b': "the letter B", |
|
283 'c': "is for cookie"}) |
|
284 self.tdb.Insert(tabname, {'a': "is for aardvark", |
|
285 'e': "the letter E", |
|
286 'c': "is for cookie", |
|
287 'd': "is for dog"}) |
|
288 self.tdb.Insert(tabname, {'a': "the letter A", |
|
289 'e': "the letter E", |
|
290 'c': "is for cookie", |
|
291 'p': "is for Python"}) |
|
292 |
|
293 values = self.tdb.Select( |
|
294 tabname, ['p', 'e'], |
|
295 conditions={'e': dbtables.PrefixCond('the l')}) |
|
296 self.assertEqual(len(values), 2, values) |
|
297 self.assertEqual(values[0]['e'], values[1]['e'], values) |
|
298 self.assertNotEqual(values[0]['p'], values[1]['p'], values) |
|
299 |
|
300 values = self.tdb.Select( |
|
301 tabname, ['d', 'a'], |
|
302 conditions={'a': dbtables.LikeCond('%aardvark%')}) |
|
303 self.assertEqual(len(values), 1, values) |
|
304 self.assertEqual(values[0]['d'], "is for dog", values) |
|
305 self.assertEqual(values[0]['a'], "is for aardvark", values) |
|
306 |
|
307 values = self.tdb.Select(tabname, None, |
|
308 {'b': dbtables.Cond(), |
|
309 'e':dbtables.LikeCond('%letter%'), |
|
310 'a':dbtables.PrefixCond('is'), |
|
311 'd':dbtables.ExactCond('is for dog'), |
|
312 'c':dbtables.PrefixCond('is for'), |
|
313 'p':lambda s: not s}) |
|
314 self.assertEqual(len(values), 1, values) |
|
315 self.assertEqual(values[0]['d'], "is for dog", values) |
|
316 self.assertEqual(values[0]['a'], "is for aardvark", values) |
|
317 |
|
318 def test_Delete(self): |
|
319 tabname = "test_Delete" |
|
320 self.tdb.CreateTable(tabname, ['x', 'y', 'z']) |
|
321 |
|
322 # prior to 2001-05-09 there was a bug where Delete() would |
|
323 # fail if it encountered any rows that did not have values in |
|
324 # every column. |
|
325 # Hunted and Squashed by <Donwulff> (Jukka Santala - donwulff@nic.fi) |
|
326 self.tdb.Insert(tabname, {'x': 'X1', 'y':'Y1'}) |
|
327 self.tdb.Insert(tabname, {'x': 'X2', 'y':'Y2', 'z': 'Z2'}) |
|
328 |
|
329 self.tdb.Delete(tabname, conditions={'x': dbtables.PrefixCond('X')}) |
|
330 values = self.tdb.Select(tabname, ['y'], |
|
331 conditions={'x': dbtables.PrefixCond('X')}) |
|
332 self.assertEqual(len(values), 0) |
|
333 |
|
334 def test_Modify(self): |
|
335 tabname = "test_Modify" |
|
336 self.tdb.CreateTable(tabname, ['Name', 'Type', 'Access']) |
|
337 |
|
338 self.tdb.Insert(tabname, {'Name': 'Index to MP3 files.doc', |
|
339 'Type': 'Word', 'Access': '8'}) |
|
340 self.tdb.Insert(tabname, {'Name': 'Nifty.MP3', 'Access': '1'}) |
|
341 self.tdb.Insert(tabname, {'Type': 'Unknown', 'Access': '0'}) |
|
342 |
|
343 def set_type(type): |
|
344 if type == None: |
|
345 return 'MP3' |
|
346 return type |
|
347 |
|
348 def increment_access(count): |
|
349 return str(int(count)+1) |
|
350 |
|
351 def remove_value(value): |
|
352 return None |
|
353 |
|
354 self.tdb.Modify(tabname, |
|
355 conditions={'Access': dbtables.ExactCond('0')}, |
|
356 mappings={'Access': remove_value}) |
|
357 self.tdb.Modify(tabname, |
|
358 conditions={'Name': dbtables.LikeCond('%MP3%')}, |
|
359 mappings={'Type': set_type}) |
|
360 self.tdb.Modify(tabname, |
|
361 conditions={'Name': dbtables.LikeCond('%')}, |
|
362 mappings={'Access': increment_access}) |
|
363 |
|
364 try: |
|
365 self.tdb.Modify(tabname, |
|
366 conditions={'Name': dbtables.LikeCond('%')}, |
|
367 mappings={'Access': 'What is your quest?'}) |
|
368 except TypeError: |
|
369 # success, the string value in mappings isn't callable |
|
370 pass |
|
371 else: |
|
372 raise RuntimeError, "why was TypeError not raised for bad callable?" |
|
373 |
|
374 # Delete key in select conditions |
|
375 values = self.tdb.Select( |
|
376 tabname, None, |
|
377 conditions={'Type': dbtables.ExactCond('Unknown')}) |
|
378 self.assertEqual(len(values), 1, values) |
|
379 self.assertEqual(values[0]['Name'], None, values) |
|
380 self.assertEqual(values[0]['Access'], None, values) |
|
381 |
|
382 # Modify value by select conditions |
|
383 values = self.tdb.Select( |
|
384 tabname, None, |
|
385 conditions={'Name': dbtables.ExactCond('Nifty.MP3')}) |
|
386 self.assertEqual(len(values), 1, values) |
|
387 self.assertEqual(values[0]['Type'], "MP3", values) |
|
388 self.assertEqual(values[0]['Access'], "2", values) |
|
389 |
|
390 # Make sure change applied only to select conditions |
|
391 values = self.tdb.Select( |
|
392 tabname, None, conditions={'Name': dbtables.LikeCond('%doc%')}) |
|
393 self.assertEqual(len(values), 1, values) |
|
394 self.assertEqual(values[0]['Type'], "Word", values) |
|
395 self.assertEqual(values[0]['Access'], "9", values) |
|
396 |
|
397 |
|
398 def test_suite(): |
|
399 suite = unittest.TestSuite() |
|
400 suite.addTest(unittest.makeSuite(TableDBTestCase)) |
|
401 return suite |
|
402 |
|
403 |
|
404 if __name__ == '__main__': |
|
405 unittest.main(defaultTest='test_suite') |