summaryrefslogtreecommitdiff
path: root/lib/python2.7/bsddb/test/test_recno.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/bsddb/test/test_recno.py')
-rw-r--r--lib/python2.7/bsddb/test/test_recno.py319
1 files changed, 319 insertions, 0 deletions
diff --git a/lib/python2.7/bsddb/test/test_recno.py b/lib/python2.7/bsddb/test/test_recno.py
new file mode 100644
index 0000000..b0e30de
--- /dev/null
+++ b/lib/python2.7/bsddb/test/test_recno.py
@@ -0,0 +1,319 @@
+"""TestCases for exercising a Recno DB.
+"""
+
+import os, sys
+import errno
+from pprint import pprint
+import string
+import unittest
+
+from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path
+
+
+#----------------------------------------------------------------------
+
+class SimpleRecnoTestCase(unittest.TestCase):
+ if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and
+ (sys.version_info < (3, 2))) :
+ def assertIsInstance(self, obj, datatype, msg=None) :
+ return self.assertEqual(type(obj), datatype, msg=msg)
+ def assertGreaterEqual(self, a, b, msg=None) :
+ return self.assertTrue(a>=b, msg=msg)
+
+
+ def setUp(self):
+ self.filename = get_new_database_path()
+ self.homeDir = None
+
+ def tearDown(self):
+ test_support.unlink(self.filename)
+ if self.homeDir:
+ test_support.rmtree(self.homeDir)
+
+ def test01_basic(self):
+ d = db.DB()
+
+ get_returns_none = d.set_get_returns_none(2)
+ d.set_get_returns_none(get_returns_none)
+
+ d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
+
+ for x in string.ascii_letters:
+ recno = d.append(x * 60)
+ self.assertIsInstance(recno, int)
+ self.assertGreaterEqual(recno, 1)
+ if verbose:
+ print recno,
+
+ if verbose: print
+
+ stat = d.stat()
+ if verbose:
+ pprint(stat)
+
+ for recno in range(1, len(d)+1):
+ data = d[recno]
+ if verbose:
+ print data
+
+ self.assertIsInstance(data, str)
+ self.assertEqual(data, d.get(recno))
+
+ try:
+ data = d[0] # This should raise a KeyError!?!?!
+ except db.DBInvalidArgError, val:
+ if sys.version_info < (2, 6) :
+ self.assertEqual(val[0], db.EINVAL)
+ else :
+ self.assertEqual(val.args[0], db.EINVAL)
+ if verbose: print val
+ else:
+ self.fail("expected exception")
+
+ # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2)
+ try:
+ d.has_key(0)
+ except db.DBError, val:
+ pass
+ else:
+ self.fail("has_key did not raise a proper exception")
+
+ try:
+ data = d[100]
+ except KeyError:
+ pass
+ else:
+ self.fail("expected exception")
+
+ try:
+ data = d.get(100)
+ except db.DBNotFoundError, val:
+ if get_returns_none:
+ self.fail("unexpected exception")
+ else:
+ self.assertEqual(data, None)
+
+ keys = d.keys()
+ if verbose:
+ print keys
+ self.assertIsInstance(keys, list)
+ self.assertIsInstance(keys[0], int)
+ self.assertEqual(len(keys), len(d))
+
+ items = d.items()
+ if verbose:
+ pprint(items)
+ self.assertIsInstance(items, list)
+ self.assertIsInstance(items[0], tuple)
+ self.assertEqual(len(items[0]), 2)
+ self.assertIsInstance(items[0][0], int)
+ self.assertIsInstance(items[0][1], str)
+ self.assertEqual(len(items), len(d))
+
+ self.assertTrue(d.has_key(25))
+
+ del d[25]
+ self.assertFalse(d.has_key(25))
+
+ d.delete(13)
+ self.assertFalse(d.has_key(13))
+
+ data = d.get_both(26, "z" * 60)
+ self.assertEqual(data, "z" * 60, 'was %r' % data)
+ if verbose:
+ print data
+
+ fd = d.fd()
+ if verbose:
+ print fd
+
+ c = d.cursor()
+ rec = c.first()
+ while rec:
+ if verbose:
+ print rec
+ rec = c.next()
+
+ c.set(50)
+ rec = c.current()
+ if verbose:
+ print rec
+
+ c.put(-1, "a replacement record", db.DB_CURRENT)
+
+ c.set(50)
+ rec = c.current()
+ self.assertEqual(rec, (50, "a replacement record"))
+ if verbose:
+ print rec
+
+ rec = c.set_range(30)
+ if verbose:
+ print rec
+
+ # test that non-existent key lookups work (and that
+ # DBC_set_range doesn't have a memleak under valgrind)
+ rec = c.set_range(999999)
+ self.assertEqual(rec, None)
+ if verbose:
+ print rec
+
+ c.close()
+ d.close()
+
+ d = db.DB()
+ d.open(self.filename)
+ c = d.cursor()
+
+ # put a record beyond the consecutive end of the recno's
+ d[100] = "way out there"
+ self.assertEqual(d[100], "way out there")
+
+ try:
+ data = d[99]
+ except KeyError:
+ pass
+ else:
+ self.fail("expected exception")
+
+ try:
+ d.get(99)
+ except db.DBKeyEmptyError, val:
+ if get_returns_none:
+ self.fail("unexpected DBKeyEmptyError exception")
+ else:
+ if sys.version_info < (2, 6) :
+ self.assertEqual(val[0], db.DB_KEYEMPTY)
+ else :
+ self.assertEqual(val.args[0], db.DB_KEYEMPTY)
+ if verbose: print val
+ else:
+ if not get_returns_none:
+ self.fail("expected exception")
+
+ rec = c.set(40)
+ while rec:
+ if verbose:
+ print rec
+ rec = c.next()
+
+ c.close()
+ d.close()
+
+ def test02_WithSource(self):
+ """
+ A Recno file that is given a "backing source file" is essentially a
+ simple ASCII file. Normally each record is delimited by \n and so is
+ just a line in the file, but you can set a different record delimiter
+ if needed.
+ """
+ homeDir = get_new_environment_path()
+ self.homeDir = homeDir
+ source = os.path.join(homeDir, 'test_recno.txt')
+ if not os.path.isdir(homeDir):
+ os.mkdir(homeDir)
+ f = open(source, 'w') # create the file
+ f.close()
+
+ d = db.DB()
+ # This is the default value, just checking if both int
+ d.set_re_delim(0x0A)
+ d.set_re_delim('\n') # and char can be used...
+ d.set_re_source(source)
+ d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
+
+ data = "The quick brown fox jumped over the lazy dog".split()
+ for datum in data:
+ d.append(datum)
+ d.sync()
+ d.close()
+
+ # get the text from the backing source
+ f = open(source, 'r')
+ text = f.read()
+ f.close()
+ text = text.strip()
+ if verbose:
+ print text
+ print data
+ print text.split('\n')
+
+ self.assertEqual(text.split('\n'), data)
+
+ # open as a DB again
+ d = db.DB()
+ d.set_re_source(source)
+ d.open(self.filename, db.DB_RECNO)
+
+ d[3] = 'reddish-brown'
+ d[8] = 'comatose'
+
+ d.sync()
+ d.close()
+
+ f = open(source, 'r')
+ text = f.read()
+ f.close()
+ text = text.strip()
+ if verbose:
+ print text
+ print text.split('\n')
+
+ self.assertEqual(text.split('\n'),
+ "The quick reddish-brown fox jumped over the comatose dog".split())
+
+ def test03_FixedLength(self):
+ d = db.DB()
+ d.set_re_len(40) # fixed length records, 40 bytes long
+ d.set_re_pad('-') # sets the pad character...
+ d.set_re_pad(45) # ...test both int and char
+ d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
+
+ for x in string.ascii_letters:
+ d.append(x * 35) # These will be padded
+
+ d.append('.' * 40) # this one will be exact
+
+ try: # this one will fail
+ d.append('bad' * 20)
+ except db.DBInvalidArgError, val:
+ if sys.version_info < (2, 6) :
+ self.assertEqual(val[0], db.EINVAL)
+ else :
+ self.assertEqual(val.args[0], db.EINVAL)
+ if verbose: print val
+ else:
+ self.fail("expected exception")
+
+ c = d.cursor()
+ rec = c.first()
+ while rec:
+ if verbose:
+ print rec
+ rec = c.next()
+
+ c.close()
+ d.close()
+
+ def test04_get_size_empty(self) :
+ d = db.DB()
+ d.open(self.filename, dbtype=db.DB_RECNO, flags=db.DB_CREATE)
+
+ row_id = d.append(' ')
+ self.assertEqual(1, d.get_size(key=row_id))
+ row_id = d.append('')
+ self.assertEqual(0, d.get_size(key=row_id))
+
+
+
+
+
+#----------------------------------------------------------------------
+
+
+def test_suite():
+ return unittest.makeSuite(SimpleRecnoTestCase)
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')