|
1 """TestCases for exercising a Recno DB. |
|
2 """ |
|
3 |
|
4 import os |
|
5 import errno |
|
6 from pprint import pprint |
|
7 import unittest |
|
8 |
|
9 from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path |
|
10 |
|
11 letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' |
|
12 |
|
13 |
|
14 #---------------------------------------------------------------------- |
|
15 |
|
16 class SimpleRecnoTestCase(unittest.TestCase): |
|
17 import sys |
|
18 if sys.version_info[:3] < (2, 4, 0): |
|
19 def assertFalse(self, expr, msg=None): |
|
20 self.failIf(expr,msg=msg) |
|
21 |
|
22 def setUp(self): |
|
23 self.filename = get_new_database_path() |
|
24 self.homeDir = None |
|
25 |
|
26 def tearDown(self): |
|
27 test_support.unlink(self.filename) |
|
28 if self.homeDir: |
|
29 test_support.rmtree(self.homeDir) |
|
30 |
|
31 def test01_basic(self): |
|
32 d = db.DB() |
|
33 |
|
34 get_returns_none = d.set_get_returns_none(2) |
|
35 d.set_get_returns_none(get_returns_none) |
|
36 |
|
37 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) |
|
38 |
|
39 for x in letters: |
|
40 recno = d.append(x * 60) |
|
41 self.assertEqual(type(recno), type(0)) |
|
42 self.assert_(recno >= 1) |
|
43 if verbose: |
|
44 print recno, |
|
45 |
|
46 if verbose: print |
|
47 |
|
48 stat = d.stat() |
|
49 if verbose: |
|
50 pprint(stat) |
|
51 |
|
52 for recno in range(1, len(d)+1): |
|
53 data = d[recno] |
|
54 if verbose: |
|
55 print data |
|
56 |
|
57 self.assertEqual(type(data), type("")) |
|
58 self.assertEqual(data, d.get(recno)) |
|
59 |
|
60 try: |
|
61 data = d[0] # This should raise a KeyError!?!?! |
|
62 except db.DBInvalidArgError, val: |
|
63 import sys |
|
64 if sys.version_info[0] < 3 : |
|
65 self.assertEqual(val[0], db.EINVAL) |
|
66 else : |
|
67 self.assertEqual(val.args[0], db.EINVAL) |
|
68 if verbose: print val |
|
69 else: |
|
70 self.fail("expected exception") |
|
71 |
|
72 # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2) |
|
73 try: |
|
74 d.has_key(0) |
|
75 except db.DBError, val: |
|
76 pass |
|
77 else: |
|
78 self.fail("has_key did not raise a proper exception") |
|
79 |
|
80 try: |
|
81 data = d[100] |
|
82 except KeyError: |
|
83 pass |
|
84 else: |
|
85 self.fail("expected exception") |
|
86 |
|
87 try: |
|
88 data = d.get(100) |
|
89 except db.DBNotFoundError, val: |
|
90 if get_returns_none: |
|
91 self.fail("unexpected exception") |
|
92 else: |
|
93 self.assertEqual(data, None) |
|
94 |
|
95 keys = d.keys() |
|
96 if verbose: |
|
97 print keys |
|
98 self.assertEqual(type(keys), type([])) |
|
99 self.assertEqual(type(keys[0]), type(123)) |
|
100 self.assertEqual(len(keys), len(d)) |
|
101 |
|
102 items = d.items() |
|
103 if verbose: |
|
104 pprint(items) |
|
105 self.assertEqual(type(items), type([])) |
|
106 self.assertEqual(type(items[0]), type(())) |
|
107 self.assertEqual(len(items[0]), 2) |
|
108 self.assertEqual(type(items[0][0]), type(123)) |
|
109 self.assertEqual(type(items[0][1]), type("")) |
|
110 self.assertEqual(len(items), len(d)) |
|
111 |
|
112 self.assert_(d.has_key(25)) |
|
113 |
|
114 del d[25] |
|
115 self.assertFalse(d.has_key(25)) |
|
116 |
|
117 d.delete(13) |
|
118 self.assertFalse(d.has_key(13)) |
|
119 |
|
120 data = d.get_both(26, "z" * 60) |
|
121 self.assertEqual(data, "z" * 60, 'was %r' % data) |
|
122 if verbose: |
|
123 print data |
|
124 |
|
125 fd = d.fd() |
|
126 if verbose: |
|
127 print fd |
|
128 |
|
129 c = d.cursor() |
|
130 rec = c.first() |
|
131 while rec: |
|
132 if verbose: |
|
133 print rec |
|
134 rec = c.next() |
|
135 |
|
136 c.set(50) |
|
137 rec = c.current() |
|
138 if verbose: |
|
139 print rec |
|
140 |
|
141 c.put(-1, "a replacement record", db.DB_CURRENT) |
|
142 |
|
143 c.set(50) |
|
144 rec = c.current() |
|
145 self.assertEqual(rec, (50, "a replacement record")) |
|
146 if verbose: |
|
147 print rec |
|
148 |
|
149 rec = c.set_range(30) |
|
150 if verbose: |
|
151 print rec |
|
152 |
|
153 # test that non-existant key lookups work (and that |
|
154 # DBC_set_range doesn't have a memleak under valgrind) |
|
155 rec = c.set_range(999999) |
|
156 self.assertEqual(rec, None) |
|
157 if verbose: |
|
158 print rec |
|
159 |
|
160 c.close() |
|
161 d.close() |
|
162 |
|
163 d = db.DB() |
|
164 d.open(self.filename) |
|
165 c = d.cursor() |
|
166 |
|
167 # put a record beyond the consecutive end of the recno's |
|
168 d[100] = "way out there" |
|
169 self.assertEqual(d[100], "way out there") |
|
170 |
|
171 try: |
|
172 data = d[99] |
|
173 except KeyError: |
|
174 pass |
|
175 else: |
|
176 self.fail("expected exception") |
|
177 |
|
178 try: |
|
179 d.get(99) |
|
180 except db.DBKeyEmptyError, val: |
|
181 if get_returns_none: |
|
182 self.fail("unexpected DBKeyEmptyError exception") |
|
183 else: |
|
184 self.assertEqual(val[0], db.DB_KEYEMPTY) |
|
185 if verbose: print val |
|
186 else: |
|
187 if not get_returns_none: |
|
188 self.fail("expected exception") |
|
189 |
|
190 rec = c.set(40) |
|
191 while rec: |
|
192 if verbose: |
|
193 print rec |
|
194 rec = c.next() |
|
195 |
|
196 c.close() |
|
197 d.close() |
|
198 |
|
199 def test02_WithSource(self): |
|
200 """ |
|
201 A Recno file that is given a "backing source file" is essentially a |
|
202 simple ASCII file. Normally each record is delimited by \n and so is |
|
203 just a line in the file, but you can set a different record delimiter |
|
204 if needed. |
|
205 """ |
|
206 homeDir = get_new_environment_path() |
|
207 self.homeDir = homeDir |
|
208 source = os.path.join(homeDir, 'test_recno.txt') |
|
209 if not os.path.isdir(homeDir): |
|
210 os.mkdir(homeDir) |
|
211 f = open(source, 'w') # create the file |
|
212 f.close() |
|
213 |
|
214 d = db.DB() |
|
215 # This is the default value, just checking if both int |
|
216 d.set_re_delim(0x0A) |
|
217 d.set_re_delim('\n') # and char can be used... |
|
218 d.set_re_source(source) |
|
219 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) |
|
220 |
|
221 data = "The quick brown fox jumped over the lazy dog".split() |
|
222 for datum in data: |
|
223 d.append(datum) |
|
224 d.sync() |
|
225 d.close() |
|
226 |
|
227 # get the text from the backing source |
|
228 text = open(source, 'r').read() |
|
229 text = text.strip() |
|
230 if verbose: |
|
231 print text |
|
232 print data |
|
233 print text.split('\n') |
|
234 |
|
235 self.assertEqual(text.split('\n'), data) |
|
236 |
|
237 # open as a DB again |
|
238 d = db.DB() |
|
239 d.set_re_source(source) |
|
240 d.open(self.filename, db.DB_RECNO) |
|
241 |
|
242 d[3] = 'reddish-brown' |
|
243 d[8] = 'comatose' |
|
244 |
|
245 d.sync() |
|
246 d.close() |
|
247 |
|
248 text = open(source, 'r').read() |
|
249 text = text.strip() |
|
250 if verbose: |
|
251 print text |
|
252 print text.split('\n') |
|
253 |
|
254 self.assertEqual(text.split('\n'), |
|
255 "The quick reddish-brown fox jumped over the comatose dog".split()) |
|
256 |
|
257 def test03_FixedLength(self): |
|
258 d = db.DB() |
|
259 d.set_re_len(40) # fixed length records, 40 bytes long |
|
260 d.set_re_pad('-') # sets the pad character... |
|
261 d.set_re_pad(45) # ...test both int and char |
|
262 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) |
|
263 |
|
264 for x in letters: |
|
265 d.append(x * 35) # These will be padded |
|
266 |
|
267 d.append('.' * 40) # this one will be exact |
|
268 |
|
269 try: # this one will fail |
|
270 d.append('bad' * 20) |
|
271 except db.DBInvalidArgError, val: |
|
272 import sys |
|
273 if sys.version_info[0] < 3 : |
|
274 self.assertEqual(val[0], db.EINVAL) |
|
275 else : |
|
276 self.assertEqual(val.args[0], db.EINVAL) |
|
277 if verbose: print val |
|
278 else: |
|
279 self.fail("expected exception") |
|
280 |
|
281 c = d.cursor() |
|
282 rec = c.first() |
|
283 while rec: |
|
284 if verbose: |
|
285 print rec |
|
286 rec = c.next() |
|
287 |
|
288 c.close() |
|
289 d.close() |
|
290 |
|
291 |
|
292 #---------------------------------------------------------------------- |
|
293 |
|
294 |
|
295 def test_suite(): |
|
296 return unittest.makeSuite(SimpleRecnoTestCase) |
|
297 |
|
298 |
|
299 if __name__ == '__main__': |
|
300 unittest.main(defaultTest='test_suite') |