summaryrefslogtreecommitdiff
path: root/lib/python2.7/bsddb/test/test_all.py
diff options
context:
space:
mode:
authorrahulp132020-03-17 14:55:41 +0530
committerrahulp132020-03-17 14:55:41 +0530
commit296443137f4288cb030e92859ccfbe3204bc1088 (patch)
treeca4798c2da1e7244edc3bc108d81b462b537aea2 /lib/python2.7/bsddb/test/test_all.py
parent0db48f6533517ecebfd9f0693f89deca28408b76 (diff)
downloadKiCad-eSim-296443137f4288cb030e92859ccfbe3204bc1088.tar.gz
KiCad-eSim-296443137f4288cb030e92859ccfbe3204bc1088.tar.bz2
KiCad-eSim-296443137f4288cb030e92859ccfbe3204bc1088.zip
initial commit
Diffstat (limited to 'lib/python2.7/bsddb/test/test_all.py')
-rw-r--r--lib/python2.7/bsddb/test/test_all.py620
1 files changed, 620 insertions, 0 deletions
diff --git a/lib/python2.7/bsddb/test/test_all.py b/lib/python2.7/bsddb/test/test_all.py
new file mode 100644
index 0000000..529dfad
--- /dev/null
+++ b/lib/python2.7/bsddb/test/test_all.py
@@ -0,0 +1,620 @@
+"""Run all test cases.
+"""
+
+import sys
+import os
+import unittest
+try:
+ # For Pythons w/distutils pybsddb
+ import bsddb3 as bsddb
+except ImportError:
+ # For Python 2.3
+ import bsddb
+
+
+if sys.version_info[0] >= 3 :
+ charset = "iso8859-1" # Full 8 bit
+
+ class logcursor_py3k(object) :
+ def __init__(self, env) :
+ self._logcursor = env.log_cursor()
+
+ def __getattr__(self, v) :
+ return getattr(self._logcursor, v)
+
+ def __next__(self) :
+ v = getattr(self._logcursor, "next")()
+ if v is not None :
+ v = (v[0], v[1].decode(charset))
+ return v
+
+ next = __next__
+
+ def first(self) :
+ v = self._logcursor.first()
+ if v is not None :
+ v = (v[0], v[1].decode(charset))
+ return v
+
+ def last(self) :
+ v = self._logcursor.last()
+ if v is not None :
+ v = (v[0], v[1].decode(charset))
+ return v
+
+ def prev(self) :
+ v = self._logcursor.prev()
+ if v is not None :
+ v = (v[0], v[1].decode(charset))
+ return v
+
+ def current(self) :
+ v = self._logcursor.current()
+ if v is not None :
+ v = (v[0], v[1].decode(charset))
+ return v
+
+ def set(self, lsn) :
+ v = self._logcursor.set(lsn)
+ if v is not None :
+ v = (v[0], v[1].decode(charset))
+ return v
+
+ class cursor_py3k(object) :
+ def __init__(self, db, *args, **kwargs) :
+ self._dbcursor = db.cursor(*args, **kwargs)
+
+ def __getattr__(self, v) :
+ return getattr(self._dbcursor, v)
+
+ def _fix(self, v) :
+ if v is None : return None
+ key, value = v
+ if isinstance(key, bytes) :
+ key = key.decode(charset)
+ return (key, value.decode(charset))
+
+ def __next__(self) :
+ v = getattr(self._dbcursor, "next")()
+ return self._fix(v)
+
+ next = __next__
+
+ def previous(self) :
+ v = self._dbcursor.previous()
+ return self._fix(v)
+
+ def last(self) :
+ v = self._dbcursor.last()
+ return self._fix(v)
+
+ def set(self, k) :
+ if isinstance(k, str) :
+ k = bytes(k, charset)
+ v = self._dbcursor.set(k)
+ return self._fix(v)
+
+ def set_recno(self, num) :
+ v = self._dbcursor.set_recno(num)
+ return self._fix(v)
+
+ def set_range(self, k, dlen=-1, doff=-1) :
+ if isinstance(k, str) :
+ k = bytes(k, charset)
+ v = self._dbcursor.set_range(k, dlen=dlen, doff=doff)
+ return self._fix(v)
+
+ def dup(self, flags=0) :
+ cursor = self._dbcursor.dup(flags)
+ return dup_cursor_py3k(cursor)
+
+ def next_dup(self) :
+ v = self._dbcursor.next_dup()
+ return self._fix(v)
+
+ def next_nodup(self) :
+ v = self._dbcursor.next_nodup()
+ return self._fix(v)
+
+ def put(self, key, data, flags=0, dlen=-1, doff=-1) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ if isinstance(data, str) :
+ value = bytes(data, charset)
+ return self._dbcursor.put(key, data, flags=flags, dlen=dlen,
+ doff=doff)
+
+ def current(self, flags=0, dlen=-1, doff=-1) :
+ v = self._dbcursor.current(flags=flags, dlen=dlen, doff=doff)
+ return self._fix(v)
+
+ def first(self) :
+ v = self._dbcursor.first()
+ return self._fix(v)
+
+ def pget(self, key=None, data=None, flags=0) :
+ # Incorrect because key can be a bare number,
+ # but enough to pass testsuite
+ if isinstance(key, int) and (data is None) and (flags == 0) :
+ flags = key
+ key = None
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ if isinstance(data, int) and (flags==0) :
+ flags = data
+ data = None
+ if isinstance(data, str) :
+ data = bytes(data, charset)
+ v=self._dbcursor.pget(key=key, data=data, flags=flags)
+ if v is not None :
+ v1, v2, v3 = v
+ if isinstance(v1, bytes) :
+ v1 = v1.decode(charset)
+ if isinstance(v2, bytes) :
+ v2 = v2.decode(charset)
+
+ v = (v1, v2, v3.decode(charset))
+
+ return v
+
+ def join_item(self) :
+ v = self._dbcursor.join_item()
+ if v is not None :
+ v = v.decode(charset)
+ return v
+
+ def get(self, *args, **kwargs) :
+ l = len(args)
+ if l == 2 :
+ k, f = args
+ if isinstance(k, str) :
+ k = bytes(k, "iso8859-1")
+ args = (k, f)
+ elif l == 3 :
+ k, d, f = args
+ if isinstance(k, str) :
+ k = bytes(k, charset)
+ if isinstance(d, str) :
+ d = bytes(d, charset)
+ args =(k, d, f)
+
+ v = self._dbcursor.get(*args, **kwargs)
+ if v is not None :
+ k, v = v
+ if isinstance(k, bytes) :
+ k = k.decode(charset)
+ v = (k, v.decode(charset))
+ return v
+
+ def get_both(self, key, value) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ if isinstance(value, str) :
+ value = bytes(value, charset)
+ v=self._dbcursor.get_both(key, value)
+ return self._fix(v)
+
+ class dup_cursor_py3k(cursor_py3k) :
+ def __init__(self, dbcursor) :
+ self._dbcursor = dbcursor
+
+ class DB_py3k(object) :
+ def __init__(self, *args, **kwargs) :
+ args2=[]
+ for i in args :
+ if isinstance(i, DBEnv_py3k) :
+ i = i._dbenv
+ args2.append(i)
+ args = tuple(args2)
+ for k, v in kwargs.items() :
+ if isinstance(v, DBEnv_py3k) :
+ kwargs[k] = v._dbenv
+
+ self._db = bsddb._db.DB_orig(*args, **kwargs)
+
+ def __contains__(self, k) :
+ if isinstance(k, str) :
+ k = bytes(k, charset)
+ return getattr(self._db, "has_key")(k)
+
+ def __getitem__(self, k) :
+ if isinstance(k, str) :
+ k = bytes(k, charset)
+ v = self._db[k]
+ if v is not None :
+ v = v.decode(charset)
+ return v
+
+ def __setitem__(self, k, v) :
+ if isinstance(k, str) :
+ k = bytes(k, charset)
+ if isinstance(v, str) :
+ v = bytes(v, charset)
+ self._db[k] = v
+
+ def __delitem__(self, k) :
+ if isinstance(k, str) :
+ k = bytes(k, charset)
+ del self._db[k]
+
+ def __getattr__(self, v) :
+ return getattr(self._db, v)
+
+ def __len__(self) :
+ return len(self._db)
+
+ def has_key(self, k, txn=None) :
+ if isinstance(k, str) :
+ k = bytes(k, charset)
+ return self._db.has_key(k, txn=txn)
+
+ def set_re_delim(self, c) :
+ if isinstance(c, str) : # We can use a numeric value byte too
+ c = bytes(c, charset)
+ return self._db.set_re_delim(c)
+
+ def set_re_pad(self, c) :
+ if isinstance(c, str) : # We can use a numeric value byte too
+ c = bytes(c, charset)
+ return self._db.set_re_pad(c)
+
+ def get_re_source(self) :
+ source = self._db.get_re_source()
+ return source.decode(charset)
+
+ def put(self, key, data, txn=None, flags=0, dlen=-1, doff=-1) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ if isinstance(data, str) :
+ value = bytes(data, charset)
+ return self._db.put(key, data, flags=flags, txn=txn, dlen=dlen,
+ doff=doff)
+
+ def append(self, value, txn=None) :
+ if isinstance(value, str) :
+ value = bytes(value, charset)
+ return self._db.append(value, txn=txn)
+
+ def get_size(self, key) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ return self._db.get_size(key)
+
+ def exists(self, key, *args, **kwargs) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ return self._db.exists(key, *args, **kwargs)
+
+ def get(self, key, default="MagicCookie", txn=None, flags=0, dlen=-1, doff=-1) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ if default != "MagicCookie" : # Magic for 'test_get_none.py'
+ v=self._db.get(key, default=default, txn=txn, flags=flags,
+ dlen=dlen, doff=doff)
+ else :
+ v=self._db.get(key, txn=txn, flags=flags,
+ dlen=dlen, doff=doff)
+ if (v is not None) and isinstance(v, bytes) :
+ v = v.decode(charset)
+ return v
+
+ def pget(self, key, txn=None) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ v=self._db.pget(key, txn=txn)
+ if v is not None :
+ v1, v2 = v
+ if isinstance(v1, bytes) :
+ v1 = v1.decode(charset)
+
+ v = (v1, v2.decode(charset))
+ return v
+
+ def get_both(self, key, value, txn=None, flags=0) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ if isinstance(value, str) :
+ value = bytes(value, charset)
+ v=self._db.get_both(key, value, txn=txn, flags=flags)
+ if v is not None :
+ v = v.decode(charset)
+ return v
+
+ def delete(self, key, txn=None) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ return self._db.delete(key, txn=txn)
+
+ def keys(self) :
+ k = self._db.keys()
+ if len(k) and isinstance(k[0], bytes) :
+ return [i.decode(charset) for i in self._db.keys()]
+ else :
+ return k
+
+ def items(self) :
+ data = self._db.items()
+ if not len(data) : return data
+ data2 = []
+ for k, v in data :
+ if isinstance(k, bytes) :
+ k = k.decode(charset)
+ data2.append((k, v.decode(charset)))
+ return data2
+
+ def associate(self, secondarydb, callback, flags=0, txn=None) :
+ class associate_callback(object) :
+ def __init__(self, callback) :
+ self._callback = callback
+
+ def callback(self, key, data) :
+ if isinstance(key, str) :
+ key = key.decode(charset)
+ data = data.decode(charset)
+ key = self._callback(key, data)
+ if (key != bsddb._db.DB_DONOTINDEX) :
+ if isinstance(key, str) :
+ key = bytes(key, charset)
+ elif isinstance(key, list) :
+ key2 = []
+ for i in key :
+ if isinstance(i, str) :
+ i = bytes(i, charset)
+ key2.append(i)
+ key = key2
+ return key
+
+ return self._db.associate(secondarydb._db,
+ associate_callback(callback).callback, flags=flags,
+ txn=txn)
+
+ def cursor(self, txn=None, flags=0) :
+ return cursor_py3k(self._db, txn=txn, flags=flags)
+
+ def join(self, cursor_list) :
+ cursor_list = [i._dbcursor for i in cursor_list]
+ return dup_cursor_py3k(self._db.join(cursor_list))
+
+ class DBEnv_py3k(object) :
+ def __init__(self, *args, **kwargs) :
+ self._dbenv = bsddb._db.DBEnv_orig(*args, **kwargs)
+
+ def __getattr__(self, v) :
+ return getattr(self._dbenv, v)
+
+ def log_cursor(self, flags=0) :
+ return logcursor_py3k(self._dbenv)
+
+ def get_lg_dir(self) :
+ return self._dbenv.get_lg_dir().decode(charset)
+
+ def get_tmp_dir(self) :
+ return self._dbenv.get_tmp_dir().decode(charset)
+
+ def get_data_dirs(self) :
+ return tuple(
+ (i.decode(charset) for i in self._dbenv.get_data_dirs()))
+
+ class DBSequence_py3k(object) :
+ def __init__(self, db, *args, **kwargs) :
+ self._db=db
+ self._dbsequence = bsddb._db.DBSequence_orig(db._db, *args, **kwargs)
+
+ def __getattr__(self, v) :
+ return getattr(self._dbsequence, v)
+
+ def open(self, key, *args, **kwargs) :
+ return self._dbsequence.open(bytes(key, charset), *args, **kwargs)
+
+ def get_key(self) :
+ return self._dbsequence.get_key().decode(charset)
+
+ def get_dbp(self) :
+ return self._db
+
+ bsddb._db.DBEnv_orig = bsddb._db.DBEnv
+ bsddb._db.DB_orig = bsddb._db.DB
+ if bsddb.db.version() <= (4, 3) :
+ bsddb._db.DBSequence_orig = None
+ else :
+ bsddb._db.DBSequence_orig = bsddb._db.DBSequence
+
+ def do_proxy_db_py3k(flag) :
+ flag2 = do_proxy_db_py3k.flag
+ do_proxy_db_py3k.flag = flag
+ if flag :
+ bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = DBEnv_py3k
+ bsddb.DB = bsddb.db.DB = bsddb._db.DB = DB_py3k
+ bsddb._db.DBSequence = DBSequence_py3k
+ else :
+ bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = bsddb._db.DBEnv_orig
+ bsddb.DB = bsddb.db.DB = bsddb._db.DB = bsddb._db.DB_orig
+ bsddb._db.DBSequence = bsddb._db.DBSequence_orig
+ return flag2
+
+ do_proxy_db_py3k.flag = False
+ do_proxy_db_py3k(True)
+
+try:
+ # For Pythons w/distutils pybsddb
+ from bsddb3 import db, dbtables, dbutils, dbshelve, \
+ hashopen, btopen, rnopen, dbobj
+except ImportError:
+ # For Python 2.3
+ from bsddb import db, dbtables, dbutils, dbshelve, \
+ hashopen, btopen, rnopen, dbobj
+
+try:
+ from bsddb3 import test_support
+except ImportError:
+ if sys.version_info[0] < 3 :
+ from test import test_support
+ else :
+ from test import support as test_support
+
+
+try:
+ if sys.version_info[0] < 3 :
+ from threading import Thread, currentThread
+ del Thread, currentThread
+ else :
+ from threading import Thread, current_thread
+ del Thread, current_thread
+ have_threads = True
+except ImportError:
+ have_threads = False
+
+verbose = 0
+if 'verbose' in sys.argv:
+ verbose = 1
+ sys.argv.remove('verbose')
+
+if 'silent' in sys.argv: # take care of old flag, just in case
+ verbose = 0
+ sys.argv.remove('silent')
+
+
+def print_versions():
+ print
+ print '-=' * 38
+ print db.DB_VERSION_STRING
+ print 'bsddb.db.version(): %s' % (db.version(), )
+ if db.version() >= (5, 0) :
+ print 'bsddb.db.full_version(): %s' %repr(db.full_version())
+ print 'bsddb.db.__version__: %s' % db.__version__
+ print 'bsddb.db.cvsid: %s' % db.cvsid
+
+ # Workaround for allowing generating an EGGs as a ZIP files.
+ suffix="__"
+ print 'py module: %s' % getattr(bsddb, "__file"+suffix)
+ print 'extension module: %s' % getattr(bsddb, "__file"+suffix)
+
+ print 'python version: %s' % sys.version
+ print 'My pid: %s' % os.getpid()
+ print '-=' * 38
+
+
+def get_new_path(name) :
+ get_new_path.mutex.acquire()
+ try :
+ import os
+ path=os.path.join(get_new_path.prefix,
+ name+"_"+str(os.getpid())+"_"+str(get_new_path.num))
+ get_new_path.num+=1
+ finally :
+ get_new_path.mutex.release()
+ return path
+
+def get_new_environment_path() :
+ path=get_new_path("environment")
+ import os
+ try:
+ os.makedirs(path,mode=0700)
+ except os.error:
+ test_support.rmtree(path)
+ os.makedirs(path)
+ return path
+
+def get_new_database_path() :
+ path=get_new_path("database")
+ import os
+ if os.path.exists(path) :
+ os.remove(path)
+ return path
+
+
+# This path can be overridden via "set_test_path_prefix()".
+import os, os.path
+get_new_path.prefix=os.path.join(os.environ.get("TMPDIR",
+ os.path.join(os.sep,"tmp")), "z-Berkeley_DB")
+get_new_path.num=0
+
+def get_test_path_prefix() :
+ return get_new_path.prefix
+
+def set_test_path_prefix(path) :
+ get_new_path.prefix=path
+
+def remove_test_path_directory() :
+ test_support.rmtree(get_new_path.prefix)
+
+if have_threads :
+ import threading
+ get_new_path.mutex=threading.Lock()
+ del threading
+else :
+ class Lock(object) :
+ def acquire(self) :
+ pass
+ def release(self) :
+ pass
+ get_new_path.mutex=Lock()
+ del Lock
+
+
+
+class PrintInfoFakeTest(unittest.TestCase):
+ def testPrintVersions(self):
+ print_versions()
+
+
+# This little hack is for when this module is run as main and all the
+# other modules import it so they will still be able to get the right
+# verbose setting. It's confusing but it works.
+if sys.version_info[0] < 3 :
+ import test_all
+ test_all.verbose = verbose
+else :
+ import sys
+ print >>sys.stderr, "Work to do!"
+
+
+def suite(module_prefix='', timing_check=None):
+ test_modules = [
+ 'test_associate',
+ 'test_basics',
+ 'test_dbenv',
+ 'test_db',
+ 'test_compare',
+ 'test_compat',
+ 'test_cursor_pget_bug',
+ 'test_dbobj',
+ 'test_dbshelve',
+ 'test_dbtables',
+ 'test_distributed_transactions',
+ 'test_early_close',
+ 'test_fileid',
+ 'test_get_none',
+ 'test_join',
+ 'test_lock',
+ 'test_misc',
+ 'test_pickle',
+ 'test_queue',
+ 'test_recno',
+ 'test_replication',
+ 'test_sequence',
+ 'test_thread',
+ ]
+
+ alltests = unittest.TestSuite()
+ for name in test_modules:
+ #module = __import__(name)
+ # Do it this way so that suite may be called externally via
+ # python's Lib/test/test_bsddb3.
+ module = __import__(module_prefix+name, globals(), locals(), name)
+
+ alltests.addTest(module.test_suite())
+ if timing_check:
+ alltests.addTest(unittest.makeSuite(timing_check))
+ return alltests
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(PrintInfoFakeTest))
+ return suite
+
+
+if __name__ == '__main__':
+ print_versions()
+ unittest.main(defaultTest='suite')