|
1 """Miscellaneous bsddb module test cases |
|
2 """ |
|
3 |
|
4 import os |
|
5 import unittest |
|
6 |
|
7 from test_all import db, dbshelve, hashopen, test_support, get_new_environment_path, get_new_database_path |
|
8 |
|
9 #---------------------------------------------------------------------- |
|
10 |
|
11 class MiscTestCase(unittest.TestCase): |
|
12 def setUp(self): |
|
13 self.filename = self.__class__.__name__ + '.db' |
|
14 self.homeDir = get_new_environment_path() |
|
15 |
|
16 def tearDown(self): |
|
17 test_support.unlink(self.filename) |
|
18 test_support.rmtree(self.homeDir) |
|
19 |
|
20 def test01_badpointer(self): |
|
21 dbs = dbshelve.open(self.filename) |
|
22 dbs.close() |
|
23 self.assertRaises(db.DBError, dbs.get, "foo") |
|
24 |
|
25 def test02_db_home(self): |
|
26 env = db.DBEnv() |
|
27 # check for crash fixed when db_home is used before open() |
|
28 self.assert_(env.db_home is None) |
|
29 env.open(self.homeDir, db.DB_CREATE) |
|
30 import sys |
|
31 if sys.version_info[0] < 3 : |
|
32 self.assertEqual(self.homeDir, env.db_home) |
|
33 else : |
|
34 self.assertEqual(bytes(self.homeDir, "ascii"), env.db_home) |
|
35 |
|
36 def test03_repr_closed_db(self): |
|
37 db = hashopen(self.filename) |
|
38 db.close() |
|
39 rp = repr(db) |
|
40 self.assertEquals(rp, "{}") |
|
41 |
|
42 def test04_repr_db(self) : |
|
43 db = hashopen(self.filename) |
|
44 d = {} |
|
45 for i in xrange(100) : |
|
46 db[repr(i)] = repr(100*i) |
|
47 d[repr(i)] = repr(100*i) |
|
48 db.close() |
|
49 db = hashopen(self.filename) |
|
50 rp = repr(db) |
|
51 self.assertEquals(rp, repr(d)) |
|
52 db.close() |
|
53 |
|
54 # http://sourceforge.net/tracker/index.php?func=detail&aid=1708868&group_id=13900&atid=313900 |
|
55 # |
|
56 # See the bug report for details. |
|
57 # |
|
58 # The problem was that make_key_dbt() was not allocating a copy of |
|
59 # string keys but FREE_DBT() was always being told to free it when the |
|
60 # database was opened with DB_THREAD. |
|
61 def test05_double_free_make_key_dbt(self): |
|
62 try: |
|
63 db1 = db.DB() |
|
64 db1.open(self.filename, None, db.DB_BTREE, |
|
65 db.DB_CREATE | db.DB_THREAD) |
|
66 |
|
67 curs = db1.cursor() |
|
68 t = curs.get("/foo", db.DB_SET) |
|
69 # double free happened during exit from DBC_get |
|
70 finally: |
|
71 db1.close() |
|
72 test_support.unlink(self.filename) |
|
73 |
|
74 def test06_key_with_null_bytes(self): |
|
75 try: |
|
76 db1 = db.DB() |
|
77 db1.open(self.filename, None, db.DB_HASH, db.DB_CREATE) |
|
78 db1['a'] = 'eh?' |
|
79 db1['a\x00'] = 'eh zed.' |
|
80 db1['a\x00a'] = 'eh zed eh?' |
|
81 db1['aaa'] = 'eh eh eh!' |
|
82 keys = db1.keys() |
|
83 keys.sort() |
|
84 self.assertEqual(['a', 'a\x00', 'a\x00a', 'aaa'], keys) |
|
85 self.assertEqual(db1['a'], 'eh?') |
|
86 self.assertEqual(db1['a\x00'], 'eh zed.') |
|
87 self.assertEqual(db1['a\x00a'], 'eh zed eh?') |
|
88 self.assertEqual(db1['aaa'], 'eh eh eh!') |
|
89 finally: |
|
90 db1.close() |
|
91 test_support.unlink(self.filename) |
|
92 |
|
93 def test07_DB_set_flags_persists(self): |
|
94 if db.version() < (4,2): |
|
95 # The get_flags API required for this to work is only available |
|
96 # in Berkeley DB >= 4.2 |
|
97 return |
|
98 try: |
|
99 db1 = db.DB() |
|
100 db1.set_flags(db.DB_DUPSORT) |
|
101 db1.open(self.filename, db.DB_HASH, db.DB_CREATE) |
|
102 db1['a'] = 'eh' |
|
103 db1['a'] = 'A' |
|
104 self.assertEqual([('a', 'A')], db1.items()) |
|
105 db1.put('a', 'Aa') |
|
106 self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items()) |
|
107 db1.close() |
|
108 db1 = db.DB() |
|
109 # no set_flags call, we're testing that it reads and obeys |
|
110 # the flags on open. |
|
111 db1.open(self.filename, db.DB_HASH) |
|
112 self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items()) |
|
113 # if it read the flags right this will replace all values |
|
114 # for key 'a' instead of adding a new one. (as a dict should) |
|
115 db1['a'] = 'new A' |
|
116 self.assertEqual([('a', 'new A')], db1.items()) |
|
117 finally: |
|
118 db1.close() |
|
119 test_support.unlink(self.filename) |
|
120 |
|
121 |
|
122 #---------------------------------------------------------------------- |
|
123 |
|
124 |
|
125 def test_suite(): |
|
126 return unittest.makeSuite(MiscTestCase) |
|
127 |
|
128 |
|
129 if __name__ == '__main__': |
|
130 unittest.main(defaultTest='test_suite') |