symbian-qemu-0.9.1-12/python-2.6.1/Lib/bsddb/test/test_dbshelve.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """
       
     2 TestCases for checking dbShelve objects.
       
     3 """
       
     4 
       
     5 import os, string
       
     6 import random
       
     7 import unittest
       
     8 
       
     9 
       
    10 from test_all import db, dbshelve, test_support, verbose, \
       
    11         get_new_environment_path, get_new_database_path
       
    12 
       
    13 
       
    14 
       
    15 #----------------------------------------------------------------------
       
    16 
       
    17 # We want the objects to be comparable so we can test dbshelve.values
       
    18 # later on.
       
    19 class DataClass:
       
    20     def __init__(self):
       
    21         self.value = random.random()
       
    22 
       
    23     def __repr__(self) :  # For Python 3.0 comparison
       
    24         return "DataClass %f" %self.value
       
    25 
       
    26     def __cmp__(self, other):  # For Python 2.x comparison
       
    27         return cmp(self.value, other)
       
    28 
       
    29 
       
    30 class DBShelveTestCase(unittest.TestCase):
       
    31     def setUp(self):
       
    32         import sys
       
    33         if sys.version_info[0] >= 3 :
       
    34             from test_all import do_proxy_db_py3k
       
    35             self._flag_proxy_db_py3k = do_proxy_db_py3k(False)
       
    36         self.filename = get_new_database_path()
       
    37         self.do_open()
       
    38 
       
    39     def tearDown(self):
       
    40         import sys
       
    41         if sys.version_info[0] >= 3 :
       
    42             from test_all import do_proxy_db_py3k
       
    43             do_proxy_db_py3k(self._flag_proxy_db_py3k)
       
    44         self.do_close()
       
    45         test_support.unlink(self.filename)
       
    46 
       
    47     def mk(self, key):
       
    48         """Turn key into an appropriate key type for this db"""
       
    49         # override in child class for RECNO
       
    50         import sys
       
    51         if sys.version_info[0] < 3 :
       
    52             return key
       
    53         else :
       
    54             return bytes(key, "iso8859-1")  # 8 bits
       
    55 
       
    56     def populateDB(self, d):
       
    57         for x in string.letters:
       
    58             d[self.mk('S' + x)] = 10 * x           # add a string
       
    59             d[self.mk('I' + x)] = ord(x)           # add an integer
       
    60             d[self.mk('L' + x)] = [x] * 10         # add a list
       
    61 
       
    62             inst = DataClass()            # add an instance
       
    63             inst.S = 10 * x
       
    64             inst.I = ord(x)
       
    65             inst.L = [x] * 10
       
    66             d[self.mk('O' + x)] = inst
       
    67 
       
    68 
       
    69     # overridable in derived classes to affect how the shelf is created/opened
       
    70     def do_open(self):
       
    71         self.d = dbshelve.open(self.filename)
       
    72 
       
    73     # and closed...
       
    74     def do_close(self):
       
    75         self.d.close()
       
    76 
       
    77 
       
    78 
       
    79     def test01_basics(self):
       
    80         if verbose:
       
    81             print '\n', '-=' * 30
       
    82             print "Running %s.test01_basics..." % self.__class__.__name__
       
    83 
       
    84         self.populateDB(self.d)
       
    85         self.d.sync()
       
    86         self.do_close()
       
    87         self.do_open()
       
    88         d = self.d
       
    89 
       
    90         l = len(d)
       
    91         k = d.keys()
       
    92         s = d.stat()
       
    93         f = d.fd()
       
    94 
       
    95         if verbose:
       
    96             print "length:", l
       
    97             print "keys:", k
       
    98             print "stats:", s
       
    99 
       
   100         self.assertEqual(0, d.has_key(self.mk('bad key')))
       
   101         self.assertEqual(1, d.has_key(self.mk('IA')))
       
   102         self.assertEqual(1, d.has_key(self.mk('OA')))
       
   103 
       
   104         d.delete(self.mk('IA'))
       
   105         del d[self.mk('OA')]
       
   106         self.assertEqual(0, d.has_key(self.mk('IA')))
       
   107         self.assertEqual(0, d.has_key(self.mk('OA')))
       
   108         self.assertEqual(len(d), l-2)
       
   109 
       
   110         values = []
       
   111         for key in d.keys():
       
   112             value = d[key]
       
   113             values.append(value)
       
   114             if verbose:
       
   115                 print "%s: %s" % (key, value)
       
   116             self.checkrec(key, value)
       
   117 
       
   118         dbvalues = d.values()
       
   119         self.assertEqual(len(dbvalues), len(d.keys()))
       
   120         import sys
       
   121         if sys.version_info[0] < 3 :
       
   122             values.sort()
       
   123             dbvalues.sort()
       
   124             self.assertEqual(values, dbvalues)
       
   125         else :  # XXX: Convert all to strings. Please, improve
       
   126             values.sort(key=lambda x : str(x))
       
   127             dbvalues.sort(key=lambda x : str(x))
       
   128             self.assertEqual(repr(values), repr(dbvalues))
       
   129 
       
   130         items = d.items()
       
   131         self.assertEqual(len(items), len(values))
       
   132 
       
   133         for key, value in items:
       
   134             self.checkrec(key, value)
       
   135 
       
   136         self.assertEqual(d.get(self.mk('bad key')), None)
       
   137         self.assertEqual(d.get(self.mk('bad key'), None), None)
       
   138         self.assertEqual(d.get(self.mk('bad key'), 'a string'), 'a string')
       
   139         self.assertEqual(d.get(self.mk('bad key'), [1, 2, 3]), [1, 2, 3])
       
   140 
       
   141         d.set_get_returns_none(0)
       
   142         self.assertRaises(db.DBNotFoundError, d.get, self.mk('bad key'))
       
   143         d.set_get_returns_none(1)
       
   144 
       
   145         d.put(self.mk('new key'), 'new data')
       
   146         self.assertEqual(d.get(self.mk('new key')), 'new data')
       
   147         self.assertEqual(d[self.mk('new key')], 'new data')
       
   148 
       
   149 
       
   150 
       
   151     def test02_cursors(self):
       
   152         if verbose:
       
   153             print '\n', '-=' * 30
       
   154             print "Running %s.test02_cursors..." % self.__class__.__name__
       
   155 
       
   156         self.populateDB(self.d)
       
   157         d = self.d
       
   158 
       
   159         count = 0
       
   160         c = d.cursor()
       
   161         rec = c.first()
       
   162         while rec is not None:
       
   163             count = count + 1
       
   164             if verbose:
       
   165                 print rec
       
   166             key, value = rec
       
   167             self.checkrec(key, value)
       
   168             # Hack to avoid conversion by 2to3 tool
       
   169             rec = getattr(c, "next")()
       
   170         del c
       
   171 
       
   172         self.assertEqual(count, len(d))
       
   173 
       
   174         count = 0
       
   175         c = d.cursor()
       
   176         rec = c.last()
       
   177         while rec is not None:
       
   178             count = count + 1
       
   179             if verbose:
       
   180                 print rec
       
   181             key, value = rec
       
   182             self.checkrec(key, value)
       
   183             rec = c.prev()
       
   184 
       
   185         self.assertEqual(count, len(d))
       
   186 
       
   187         c.set(self.mk('SS'))
       
   188         key, value = c.current()
       
   189         self.checkrec(key, value)
       
   190         del c
       
   191 
       
   192 
       
   193     def test03_append(self):
       
   194         # NOTE: this is overridden in RECNO subclass, don't change its name.
       
   195         if verbose:
       
   196             print '\n', '-=' * 30
       
   197             print "Running %s.test03_append..." % self.__class__.__name__
       
   198 
       
   199         self.assertRaises(dbshelve.DBShelveError,
       
   200                           self.d.append, 'unit test was here')
       
   201 
       
   202 
       
   203     def checkrec(self, key, value):
       
   204         # override this in a subclass if the key type is different
       
   205 
       
   206         import sys
       
   207         if sys.version_info[0] >= 3 :
       
   208             if isinstance(key, bytes) :
       
   209                 key = key.decode("iso8859-1")  # 8 bits
       
   210 
       
   211         x = key[1]
       
   212         if key[0] == 'S':
       
   213             self.assertEqual(type(value), str)
       
   214             self.assertEqual(value, 10 * x)
       
   215 
       
   216         elif key[0] == 'I':
       
   217             self.assertEqual(type(value), int)
       
   218             self.assertEqual(value, ord(x))
       
   219 
       
   220         elif key[0] == 'L':
       
   221             self.assertEqual(type(value), list)
       
   222             self.assertEqual(value, [x] * 10)
       
   223 
       
   224         elif key[0] == 'O':
       
   225             import sys
       
   226             if sys.version_info[0] < 3 :
       
   227                 from types import InstanceType
       
   228                 self.assertEqual(type(value), InstanceType)
       
   229             else :
       
   230                 self.assertEqual(type(value), DataClass)
       
   231 
       
   232             self.assertEqual(value.S, 10 * x)
       
   233             self.assertEqual(value.I, ord(x))
       
   234             self.assertEqual(value.L, [x] * 10)
       
   235 
       
   236         else:
       
   237             self.assert_(0, 'Unknown key type, fix the test')
       
   238 
       
   239 #----------------------------------------------------------------------
       
   240 
       
   241 class BasicShelveTestCase(DBShelveTestCase):
       
   242     def do_open(self):
       
   243         self.d = dbshelve.DBShelf()
       
   244         self.d.open(self.filename, self.dbtype, self.dbflags)
       
   245 
       
   246     def do_close(self):
       
   247         self.d.close()
       
   248 
       
   249 
       
   250 class BTreeShelveTestCase(BasicShelveTestCase):
       
   251     dbtype = db.DB_BTREE
       
   252     dbflags = db.DB_CREATE
       
   253 
       
   254 
       
   255 class HashShelveTestCase(BasicShelveTestCase):
       
   256     dbtype = db.DB_HASH
       
   257     dbflags = db.DB_CREATE
       
   258 
       
   259 
       
   260 class ThreadBTreeShelveTestCase(BasicShelveTestCase):
       
   261     dbtype = db.DB_BTREE
       
   262     dbflags = db.DB_CREATE | db.DB_THREAD
       
   263 
       
   264 
       
   265 class ThreadHashShelveTestCase(BasicShelveTestCase):
       
   266     dbtype = db.DB_HASH
       
   267     dbflags = db.DB_CREATE | db.DB_THREAD
       
   268 
       
   269 
       
   270 #----------------------------------------------------------------------
       
   271 
       
   272 class BasicEnvShelveTestCase(DBShelveTestCase):
       
   273     def do_open(self):
       
   274         self.env = db.DBEnv()
       
   275         self.env.open(self.homeDir,
       
   276                 self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE)
       
   277 
       
   278         self.filename = os.path.split(self.filename)[1]
       
   279         self.d = dbshelve.DBShelf(self.env)
       
   280         self.d.open(self.filename, self.dbtype, self.dbflags)
       
   281 
       
   282 
       
   283     def do_close(self):
       
   284         self.d.close()
       
   285         self.env.close()
       
   286 
       
   287 
       
   288     def setUp(self) :
       
   289         self.homeDir = get_new_environment_path()
       
   290         DBShelveTestCase.setUp(self)
       
   291 
       
   292     def tearDown(self):
       
   293         import sys
       
   294         if sys.version_info[0] >= 3 :
       
   295             from test_all import do_proxy_db_py3k
       
   296             do_proxy_db_py3k(self._flag_proxy_db_py3k)
       
   297         self.do_close()
       
   298         test_support.rmtree(self.homeDir)
       
   299 
       
   300 
       
   301 class EnvBTreeShelveTestCase(BasicEnvShelveTestCase):
       
   302     envflags = 0
       
   303     dbtype = db.DB_BTREE
       
   304     dbflags = db.DB_CREATE
       
   305 
       
   306 
       
   307 class EnvHashShelveTestCase(BasicEnvShelveTestCase):
       
   308     envflags = 0
       
   309     dbtype = db.DB_HASH
       
   310     dbflags = db.DB_CREATE
       
   311 
       
   312 
       
   313 class EnvThreadBTreeShelveTestCase(BasicEnvShelveTestCase):
       
   314     envflags = db.DB_THREAD
       
   315     dbtype = db.DB_BTREE
       
   316     dbflags = db.DB_CREATE | db.DB_THREAD
       
   317 
       
   318 
       
   319 class EnvThreadHashShelveTestCase(BasicEnvShelveTestCase):
       
   320     envflags = db.DB_THREAD
       
   321     dbtype = db.DB_HASH
       
   322     dbflags = db.DB_CREATE | db.DB_THREAD
       
   323 
       
   324 
       
   325 #----------------------------------------------------------------------
       
   326 # test cases for a DBShelf in a RECNO DB.
       
   327 
       
   328 class RecNoShelveTestCase(BasicShelveTestCase):
       
   329     dbtype = db.DB_RECNO
       
   330     dbflags = db.DB_CREATE
       
   331 
       
   332     def setUp(self):
       
   333         BasicShelveTestCase.setUp(self)
       
   334 
       
   335         # pool to assign integer key values out of
       
   336         self.key_pool = list(range(1, 5000))
       
   337         self.key_map = {}     # map string keys to the number we gave them
       
   338         self.intkey_map = {}  # reverse map of above
       
   339 
       
   340     def mk(self, key):
       
   341         if key not in self.key_map:
       
   342             self.key_map[key] = self.key_pool.pop(0)
       
   343             self.intkey_map[self.key_map[key]] = key
       
   344         return self.key_map[key]
       
   345 
       
   346     def checkrec(self, intkey, value):
       
   347         key = self.intkey_map[intkey]
       
   348         BasicShelveTestCase.checkrec(self, key, value)
       
   349 
       
   350     def test03_append(self):
       
   351         if verbose:
       
   352             print '\n', '-=' * 30
       
   353             print "Running %s.test03_append..." % self.__class__.__name__
       
   354 
       
   355         self.d[1] = 'spam'
       
   356         self.d[5] = 'eggs'
       
   357         self.assertEqual(6, self.d.append('spam'))
       
   358         self.assertEqual(7, self.d.append('baked beans'))
       
   359         self.assertEqual('spam', self.d.get(6))
       
   360         self.assertEqual('spam', self.d.get(1))
       
   361         self.assertEqual('baked beans', self.d.get(7))
       
   362         self.assertEqual('eggs', self.d.get(5))
       
   363 
       
   364 
       
   365 #----------------------------------------------------------------------
       
   366 
       
   367 def test_suite():
       
   368     suite = unittest.TestSuite()
       
   369 
       
   370     suite.addTest(unittest.makeSuite(DBShelveTestCase))
       
   371     suite.addTest(unittest.makeSuite(BTreeShelveTestCase))
       
   372     suite.addTest(unittest.makeSuite(HashShelveTestCase))
       
   373     suite.addTest(unittest.makeSuite(ThreadBTreeShelveTestCase))
       
   374     suite.addTest(unittest.makeSuite(ThreadHashShelveTestCase))
       
   375     suite.addTest(unittest.makeSuite(EnvBTreeShelveTestCase))
       
   376     suite.addTest(unittest.makeSuite(EnvHashShelveTestCase))
       
   377     suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase))
       
   378     suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase))
       
   379     suite.addTest(unittest.makeSuite(RecNoShelveTestCase))
       
   380 
       
   381     return suite
       
   382 
       
   383 
       
   384 if __name__ == '__main__':
       
   385     unittest.main(defaultTest='test_suite')