symbian-qemu-0.9.1-12/python-2.6.1/Lib/bsddb/test/test_all.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """Run all test cases.
       
     2 """
       
     3 
       
     4 import sys
       
     5 import os
       
     6 import unittest
       
     7 try:
       
     8     # For Pythons w/distutils pybsddb
       
     9     import bsddb3 as bsddb
       
    10 except ImportError:
       
    11     # For Python 2.3
       
    12     import bsddb
       
    13 
       
    14 
       
    15 if sys.version_info[0] >= 3 :
       
    16     charset = "iso8859-1"  # Full 8 bit
       
    17 
       
    18     class cursor_py3k(object) :
       
    19         def __init__(self, db, *args, **kwargs) :
       
    20             self._dbcursor = db.cursor(*args, **kwargs)
       
    21 
       
    22         def __getattr__(self, v) :
       
    23             return getattr(self._dbcursor, v)
       
    24 
       
    25         def _fix(self, v) :
       
    26             if v == None : return None
       
    27             key, value = v
       
    28             if isinstance(key, bytes) :
       
    29                 key = key.decode(charset)
       
    30             return (key, value.decode(charset))
       
    31 
       
    32         def __next__(self) :
       
    33             v = getattr(self._dbcursor, "next")()
       
    34             return self._fix(v)
       
    35 
       
    36         next = __next__
       
    37 
       
    38         def previous(self) :
       
    39             v = self._dbcursor.previous()
       
    40             return self._fix(v)
       
    41 
       
    42         def last(self) :
       
    43             v = self._dbcursor.last()
       
    44             return self._fix(v)
       
    45 
       
    46         def set(self, k) :
       
    47             if isinstance(k, str) :
       
    48                 k = bytes(k, charset)
       
    49             v = self._dbcursor.set(k)
       
    50             return self._fix(v)
       
    51 
       
    52         def set_recno(self, num) :
       
    53             v = self._dbcursor.set_recno(num)
       
    54             return self._fix(v)
       
    55 
       
    56         def set_range(self, k, dlen=-1, doff=-1) :
       
    57             if isinstance(k, str) :
       
    58                 k = bytes(k, charset)
       
    59             v = self._dbcursor.set_range(k, dlen=dlen, doff=doff)
       
    60             return self._fix(v)
       
    61 
       
    62         def dup(self, flags=0) :
       
    63             cursor = self._dbcursor.dup(flags)
       
    64             return dup_cursor_py3k(cursor)
       
    65 
       
    66         def next_dup(self) :
       
    67             v = self._dbcursor.next_dup()
       
    68             return self._fix(v)
       
    69 
       
    70         def next_nodup(self) :
       
    71             v = self._dbcursor.next_nodup()
       
    72             return self._fix(v)
       
    73 
       
    74         def put(self, key, value, flags=0, dlen=-1, doff=-1) :
       
    75             if isinstance(key, str) :
       
    76                 key = bytes(key, charset)
       
    77             if isinstance(value, str) :
       
    78                 value = bytes(value, charset)
       
    79             return self._dbcursor.put(key, value, flags=flags, dlen=dlen,
       
    80                     doff=doff)
       
    81 
       
    82         def current(self, flags=0, dlen=-1, doff=-1) :
       
    83             v = self._dbcursor.current(flags=flags, dlen=dlen, doff=doff)
       
    84             return self._fix(v)
       
    85 
       
    86         def first(self) :
       
    87             v = self._dbcursor.first()
       
    88             return self._fix(v)
       
    89 
       
    90         def pget(self, key=None, data=None, flags=0) :
       
    91             # Incorrect because key can be a bare number,
       
    92             # but enough to pass testsuite
       
    93             if isinstance(key, int) and (data==None) and (flags==0) :
       
    94                 flags = key
       
    95                 key = None
       
    96             if isinstance(key, str) :
       
    97                 key = bytes(key, charset)
       
    98             if isinstance(data, int) and (flags==0) :
       
    99                 flags = data
       
   100                 data = None
       
   101             if isinstance(data, str) :
       
   102                 data = bytes(data, charset)
       
   103             v=self._dbcursor.pget(key=key, data=data, flags=flags)
       
   104             if v != None :
       
   105                 v1, v2, v3 = v
       
   106                 if isinstance(v1, bytes) :
       
   107                     v1 = v1.decode(charset)
       
   108                 if isinstance(v2, bytes) :
       
   109                     v2 = v2.decode(charset)
       
   110 
       
   111                 v = (v1, v2, v3.decode(charset))
       
   112 
       
   113             return v
       
   114 
       
   115         def join_item(self) :
       
   116             v = self._dbcursor.join_item()
       
   117             if v != None :
       
   118                 v = v.decode(charset)
       
   119             return v
       
   120 
       
   121         def get(self, *args, **kwargs) :
       
   122             l = len(args)
       
   123             if l == 2 :
       
   124                 k, f = args
       
   125                 if isinstance(k, str) :
       
   126                     k = bytes(k, "iso8859-1")
       
   127                 args = (k, f)
       
   128             elif l == 3 :
       
   129                 k, d, f = args
       
   130                 if isinstance(k, str) :
       
   131                     k = bytes(k, charset)
       
   132                 if isinstance(d, str) :
       
   133                     d = bytes(d, charset)
       
   134                 args =(k, d, f)
       
   135 
       
   136             v = self._dbcursor.get(*args, **kwargs)
       
   137             if v != None :
       
   138                 k, v = v
       
   139                 if isinstance(k, bytes) :
       
   140                     k = k.decode(charset)
       
   141                 v = (k, v.decode(charset))
       
   142             return v
       
   143 
       
   144         def get_both(self, key, value) :
       
   145             if isinstance(key, str) :
       
   146                 key = bytes(key, charset)
       
   147             if isinstance(value, str) :
       
   148                 value = bytes(value, charset)
       
   149             v=self._dbcursor.get_both(key, value)
       
   150             return self._fix(v)
       
   151 
       
   152     class dup_cursor_py3k(cursor_py3k) :
       
   153         def __init__(self, dbcursor) :
       
   154             self._dbcursor = dbcursor
       
   155 
       
   156     class DB_py3k(object) :
       
   157         def __init__(self, *args, **kwargs) :
       
   158             args2=[]
       
   159             for i in args :
       
   160                 if isinstance(i, DBEnv_py3k) :
       
   161                     i = i._dbenv
       
   162                 args2.append(i)
       
   163             args = tuple(args2)
       
   164             for k, v in kwargs.items() :
       
   165                 if isinstance(v, DBEnv_py3k) :
       
   166                     kwargs[k] = v._dbenv
       
   167 
       
   168             self._db = bsddb._db.DB_orig(*args, **kwargs)
       
   169 
       
   170         def __contains__(self, k) :
       
   171             if isinstance(k, str) :
       
   172                 k = bytes(k, charset)
       
   173             return getattr(self._db, "has_key")(k)
       
   174 
       
   175         def __getitem__(self, k) :
       
   176             if isinstance(k, str) :
       
   177                 k = bytes(k, charset)
       
   178             v = self._db[k]
       
   179             if v != None :
       
   180                 v = v.decode(charset)
       
   181             return v
       
   182 
       
   183         def __setitem__(self, k, v) :
       
   184             if isinstance(k, str) :
       
   185                 k = bytes(k, charset)
       
   186             if isinstance(v, str) :
       
   187                 v = bytes(v, charset)
       
   188             self._db[k] = v
       
   189 
       
   190         def __delitem__(self, k) :
       
   191             if isinstance(k, str) :
       
   192                 k = bytes(k, charset)
       
   193             del self._db[k]
       
   194 
       
   195         def __getattr__(self, v) :
       
   196             return getattr(self._db, v)
       
   197 
       
   198         def __len__(self) :
       
   199             return len(self._db)
       
   200 
       
   201         def has_key(self, k, txn=None) :
       
   202             if isinstance(k, str) :
       
   203                 k = bytes(k, charset)
       
   204             return self._db.has_key(k, txn=txn)
       
   205 
       
   206         def put(self, key, value, txn=None, flags=0, dlen=-1, doff=-1) :
       
   207             if isinstance(key, str) :
       
   208                 key = bytes(key, charset)
       
   209             if isinstance(value, str) :
       
   210                 value = bytes(value, charset)
       
   211             return self._db.put(key, value, flags=flags, txn=txn, dlen=dlen,
       
   212                     doff=doff)
       
   213 
       
   214         def append(self, value, txn=None) :
       
   215             if isinstance(value, str) :
       
   216                 value = bytes(value, charset)
       
   217             return self._db.append(value, txn=txn)
       
   218 
       
   219         def get_size(self, key) :
       
   220             if isinstance(key, str) :
       
   221                 key = bytes(key, charset)
       
   222             return self._db.get_size(key)
       
   223 
       
   224         def get(self, key, default="MagicCookie", txn=None, flags=0, dlen=-1, doff=-1) :
       
   225             if isinstance(key, str) :
       
   226                 key = bytes(key, charset)
       
   227             if default != "MagicCookie" :  # Magic for 'test_get_none.py'
       
   228                 v=self._db.get(key, default=default, txn=txn, flags=flags,
       
   229                         dlen=dlen, doff=doff)
       
   230             else :
       
   231                 v=self._db.get(key, txn=txn, flags=flags,
       
   232                         dlen=dlen, doff=doff)
       
   233             if (v != None) and isinstance(v, bytes) :
       
   234                 v = v.decode(charset)
       
   235             return v
       
   236 
       
   237         def pget(self, key, txn=None) :
       
   238             if isinstance(key, str) :
       
   239                 key = bytes(key, charset)
       
   240             v=self._db.pget(key, txn=txn)
       
   241             if v != None :
       
   242                 v1, v2 = v
       
   243                 if isinstance(v1, bytes) :
       
   244                     v1 = v1.decode(charset)
       
   245 
       
   246                 v = (v1, v2.decode(charset))
       
   247             return v
       
   248 
       
   249         def get_both(self, key, value, txn=None, flags=0) :
       
   250             if isinstance(key, str) :
       
   251                 key = bytes(key, charset)
       
   252             if isinstance(value, str) :
       
   253                 value = bytes(value, charset)
       
   254             v=self._db.get_both(key, value, txn=txn, flags=flags)
       
   255             if v != None :
       
   256                 v = v.decode(charset)
       
   257             return v
       
   258 
       
   259         def delete(self, key, txn=None) :
       
   260             if isinstance(key, str) :
       
   261                 key = bytes(key, charset)
       
   262             return self._db.delete(key, txn=txn)
       
   263 
       
   264         def keys(self) :
       
   265             k = self._db.keys()
       
   266             if len(k) and isinstance(k[0], bytes) :
       
   267                 return [i.decode(charset) for i in self._db.keys()]
       
   268             else :
       
   269                 return k
       
   270 
       
   271         def items(self) :
       
   272             data = self._db.items()
       
   273             if not len(data) : return data
       
   274             data2 = []
       
   275             for k, v in data :
       
   276                 if isinstance(k, bytes) :
       
   277                     k = k.decode(charset)
       
   278                 data2.append((k, v.decode(charset)))
       
   279             return data2
       
   280 
       
   281         def associate(self, secondarydb, callback, flags=0, txn=None) :
       
   282             class associate_callback(object) :
       
   283                 def __init__(self, callback) :
       
   284                     self._callback = callback
       
   285 
       
   286                 def callback(self, key, data) :
       
   287                     if isinstance(key, str) :
       
   288                         key = key.decode(charset)
       
   289                     data = data.decode(charset)
       
   290                     key = self._callback(key, data)
       
   291                     if (key != bsddb._db.DB_DONOTINDEX) and isinstance(key,
       
   292                             str) :
       
   293                         key = bytes(key, charset)
       
   294                     return key
       
   295 
       
   296             return self._db.associate(secondarydb._db,
       
   297                     associate_callback(callback).callback, flags=flags, txn=txn)
       
   298 
       
   299         def cursor(self, txn=None, flags=0) :
       
   300             return cursor_py3k(self._db, txn=txn, flags=flags)
       
   301 
       
   302         def join(self, cursor_list) :
       
   303             cursor_list = [i._dbcursor for i in cursor_list]
       
   304             return dup_cursor_py3k(self._db.join(cursor_list))
       
   305 
       
   306     class DBEnv_py3k(object) :
       
   307         def __init__(self, *args, **kwargs) :
       
   308             self._dbenv = bsddb._db.DBEnv_orig(*args, **kwargs)
       
   309 
       
   310         def __getattr__(self, v) :
       
   311             return getattr(self._dbenv, v)
       
   312 
       
   313     class DBSequence_py3k(object) :
       
   314         def __init__(self, db, *args, **kwargs) :
       
   315             self._db=db
       
   316             self._dbsequence = bsddb._db.DBSequence_orig(db._db, *args, **kwargs)
       
   317 
       
   318         def __getattr__(self, v) :
       
   319             return getattr(self._dbsequence, v)
       
   320 
       
   321         def open(self, key, *args, **kwargs) :
       
   322             return self._dbsequence.open(bytes(key, charset), *args, **kwargs)
       
   323 
       
   324         def get_key(self) :
       
   325             return  self._dbsequence.get_key().decode(charset)
       
   326 
       
   327         def get_dbp(self) :
       
   328             return self._db
       
   329 
       
   330     import string
       
   331     string.letters=[chr(i) for i in xrange(65,91)]
       
   332 
       
   333     bsddb._db.DBEnv_orig = bsddb._db.DBEnv
       
   334     bsddb._db.DB_orig = bsddb._db.DB
       
   335     bsddb._db.DBSequence_orig = bsddb._db.DBSequence
       
   336 
       
   337     def do_proxy_db_py3k(flag) :
       
   338         flag2 = do_proxy_db_py3k.flag
       
   339         do_proxy_db_py3k.flag = flag
       
   340         if flag :
       
   341             bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = DBEnv_py3k
       
   342             bsddb.DB = bsddb.db.DB = bsddb._db.DB = DB_py3k
       
   343             bsddb._db.DBSequence = DBSequence_py3k
       
   344         else :
       
   345             bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = bsddb._db.DBEnv_orig
       
   346             bsddb.DB = bsddb.db.DB = bsddb._db.DB = bsddb._db.DB_orig
       
   347             bsddb._db.DBSequence = bsddb._db.DBSequence_orig
       
   348         return flag2
       
   349 
       
   350     do_proxy_db_py3k.flag = False
       
   351     do_proxy_db_py3k(True)
       
   352 
       
   353 try:
       
   354     # For Pythons w/distutils pybsddb
       
   355     from bsddb3 import db, dbtables, dbutils, dbshelve, \
       
   356             hashopen, btopen, rnopen, dbobj
       
   357 except ImportError:
       
   358     # For Python 2.3
       
   359     from bsddb import db, dbtables, dbutils, dbshelve, \
       
   360             hashopen, btopen, rnopen, dbobj
       
   361 
       
   362 try:
       
   363     from bsddb3 import test_support
       
   364 except ImportError:
       
   365     if sys.version_info[0] < 3 :
       
   366         from test import test_support
       
   367     else :
       
   368         from test import support as test_support
       
   369 
       
   370 
       
   371 try:
       
   372     if sys.version_info[0] < 3 :
       
   373         from threading import Thread, currentThread
       
   374         del Thread, currentThread
       
   375     else :
       
   376         from threading import Thread, current_thread
       
   377         del Thread, current_thread
       
   378     have_threads = True
       
   379 except ImportError:
       
   380     have_threads = False
       
   381 
       
   382 verbose = 0
       
   383 if 'verbose' in sys.argv:
       
   384     verbose = 1
       
   385     sys.argv.remove('verbose')
       
   386 
       
   387 if 'silent' in sys.argv:  # take care of old flag, just in case
       
   388     verbose = 0
       
   389     sys.argv.remove('silent')
       
   390 
       
   391 
       
   392 def print_versions():
       
   393     print
       
   394     print '-=' * 38
       
   395     print db.DB_VERSION_STRING
       
   396     print 'bsddb.db.version():   %s' % (db.version(), )
       
   397     print 'bsddb.db.__version__: %s' % db.__version__
       
   398     print 'bsddb.db.cvsid:       %s' % db.cvsid
       
   399     print 'py module:            %s' % bsddb.__file__
       
   400     print 'extension module:     %s' % bsddb._bsddb.__file__
       
   401     print 'python version:       %s' % sys.version
       
   402     print 'My pid:               %s' % os.getpid()
       
   403     print '-=' * 38
       
   404 
       
   405 
       
   406 def get_new_path(name) :
       
   407     get_new_path.mutex.acquire()
       
   408     try :
       
   409         import os
       
   410         path=os.path.join(get_new_path.prefix,
       
   411                 name+"_"+str(os.getpid())+"_"+str(get_new_path.num))
       
   412         get_new_path.num+=1
       
   413     finally :
       
   414         get_new_path.mutex.release()
       
   415     return path
       
   416 
       
   417 def get_new_environment_path() :
       
   418     path=get_new_path("environment")
       
   419     import os
       
   420     try:
       
   421         os.makedirs(path,mode=0700)
       
   422     except os.error:
       
   423         test_support.rmtree(path)
       
   424         os.makedirs(path)
       
   425     return path
       
   426 
       
   427 def get_new_database_path() :
       
   428     path=get_new_path("database")
       
   429     import os
       
   430     if os.path.exists(path) :
       
   431         os.remove(path)
       
   432     return path
       
   433 
       
   434 
       
   435 # This path can be overriden via "set_test_path_prefix()".
       
   436 import os, os.path
       
   437 get_new_path.prefix=os.path.join(os.sep,"tmp","z-Berkeley_DB")
       
   438 get_new_path.num=0
       
   439 
       
   440 def get_test_path_prefix() :
       
   441     return get_new_path.prefix
       
   442 
       
   443 def set_test_path_prefix(path) :
       
   444     get_new_path.prefix=path
       
   445 
       
   446 def remove_test_path_directory() :
       
   447     test_support.rmtree(get_new_path.prefix)
       
   448 
       
   449 if have_threads :
       
   450     import threading
       
   451     get_new_path.mutex=threading.Lock()
       
   452     del threading
       
   453 else :
       
   454     class Lock(object) :
       
   455         def acquire(self) :
       
   456             pass
       
   457         def release(self) :
       
   458             pass
       
   459     get_new_path.mutex=Lock()
       
   460     del Lock
       
   461 
       
   462 
       
   463 
       
   464 class PrintInfoFakeTest(unittest.TestCase):
       
   465     def testPrintVersions(self):
       
   466         print_versions()
       
   467 
       
   468 
       
   469 # This little hack is for when this module is run as main and all the
       
   470 # other modules import it so they will still be able to get the right
       
   471 # verbose setting.  It's confusing but it works.
       
   472 if sys.version_info[0] < 3 :
       
   473     import test_all
       
   474     test_all.verbose = verbose
       
   475 else :
       
   476     import sys
       
   477     print >>sys.stderr, "Work to do!"
       
   478 
       
   479 
       
   480 def suite(module_prefix='', timing_check=None):
       
   481     test_modules = [
       
   482         'test_associate',
       
   483         'test_basics',
       
   484         'test_compare',
       
   485         'test_compat',
       
   486         'test_cursor_pget_bug',
       
   487         'test_dbobj',
       
   488         'test_dbshelve',
       
   489         'test_dbtables',
       
   490         'test_distributed_transactions',
       
   491         'test_early_close',
       
   492         'test_get_none',
       
   493         'test_join',
       
   494         'test_lock',
       
   495         'test_misc',
       
   496         'test_pickle',
       
   497         'test_queue',
       
   498         'test_recno',
       
   499         'test_replication',
       
   500         'test_sequence',
       
   501         'test_thread',
       
   502         ]
       
   503 
       
   504     alltests = unittest.TestSuite()
       
   505     for name in test_modules:
       
   506         #module = __import__(name)
       
   507         # Do it this way so that suite may be called externally via
       
   508         # python's Lib/test/test_bsddb3.
       
   509         module = __import__(module_prefix+name, globals(), locals(), name)
       
   510 
       
   511         alltests.addTest(module.test_suite())
       
   512         if timing_check:
       
   513             alltests.addTest(unittest.makeSuite(timing_check))
       
   514     return alltests
       
   515 
       
   516 
       
   517 def test_suite():
       
   518     suite = unittest.TestSuite()
       
   519     suite.addTest(unittest.makeSuite(PrintInfoFakeTest))
       
   520     return suite
       
   521 
       
   522 
       
   523 if __name__ == '__main__':
       
   524     print_versions()
       
   525     unittest.main(defaultTest='suite')