diff options
Diffstat (limited to 'lib/python2.7/bsddb/test/test_compare.py')
-rw-r--r-- | lib/python2.7/bsddb/test/test_compare.py | 447 |
1 files changed, 447 insertions, 0 deletions
diff --git a/lib/python2.7/bsddb/test/test_compare.py b/lib/python2.7/bsddb/test/test_compare.py new file mode 100644 index 0000000..cb3b463 --- /dev/null +++ b/lib/python2.7/bsddb/test/test_compare.py @@ -0,0 +1,447 @@ +""" +TestCases for python DB duplicate and Btree key comparison function. +""" + +import sys, os, re +import test_all +from cStringIO import StringIO + +import unittest + +from test_all import db, dbshelve, test_support, \ + get_new_environment_path, get_new_database_path + + +# Needed for python 3. "cmp" vanished in 3.0.1 +def cmp(a, b) : + if a==b : return 0 + if a<b : return -1 + return 1 + +lexical_cmp = cmp + +def lowercase_cmp(left, right) : + return cmp(left.lower(), right.lower()) + +def make_reverse_comparator(cmp) : + def reverse(left, right, delegate=cmp) : + return - delegate(left, right) + return reverse + +_expected_lexical_test_data = ['', 'CCCP', 'a', 'aaa', 'b', 'c', 'cccce', 'ccccf'] +_expected_lowercase_test_data = ['', 'a', 'aaa', 'b', 'c', 'CC', 'cccce', 'ccccf', 'CCCP'] + +class ComparatorTests(unittest.TestCase) : + def comparator_test_helper(self, comparator, expected_data) : + data = expected_data[:] + + import sys + if sys.version_info < (2, 6) : + data.sort(cmp=comparator) + else : # Insertion Sort. Please, improve + data2 = [] + for i in data : + for j, k in enumerate(data2) : + r = comparator(k, i) + if r == 1 : + data2.insert(j, i) + break + else : + data2.append(i) + data = data2 + + self.assertEqual(data, expected_data, + "comparator `%s' is not right: %s vs. %s" + % (comparator, expected_data, data)) + def test_lexical_comparator(self) : + self.comparator_test_helper(lexical_cmp, _expected_lexical_test_data) + def test_reverse_lexical_comparator(self) : + rev = _expected_lexical_test_data[:] + rev.reverse() + self.comparator_test_helper(make_reverse_comparator(lexical_cmp), + rev) + def test_lowercase_comparator(self) : + self.comparator_test_helper(lowercase_cmp, + _expected_lowercase_test_data) + +class AbstractBtreeKeyCompareTestCase(unittest.TestCase) : + env = None + db = None + + if (sys.version_info < (2, 7)) or ((sys.version_info >= (3,0)) and + (sys.version_info < (3, 2))) : + def assertLess(self, a, b, msg=None) : + return self.assertTrue(a<b, msg=msg) + + def setUp(self) : + self.filename = self.__class__.__name__ + '.db' + self.homeDir = get_new_environment_path() + env = db.DBEnv() + env.open(self.homeDir, + db.DB_CREATE | db.DB_INIT_MPOOL + | db.DB_INIT_LOCK | db.DB_THREAD) + self.env = env + + def tearDown(self) : + self.closeDB() + if self.env is not None: + self.env.close() + self.env = None + test_support.rmtree(self.homeDir) + + def addDataToDB(self, data) : + i = 0 + for item in data: + self.db.put(item, str(i)) + i = i + 1 + + def createDB(self, key_comparator) : + self.db = db.DB(self.env) + self.setupDB(key_comparator) + self.db.open(self.filename, "test", db.DB_BTREE, db.DB_CREATE) + + def setupDB(self, key_comparator) : + self.db.set_bt_compare(key_comparator) + + def closeDB(self) : + if self.db is not None: + self.db.close() + self.db = None + + def startTest(self) : + pass + + def finishTest(self, expected = None) : + if expected is not None: + self.check_results(expected) + self.closeDB() + + def check_results(self, expected) : + curs = self.db.cursor() + try: + index = 0 + rec = curs.first() + while rec: + key, ignore = rec + self.assertLess(index, len(expected), + "to many values returned from cursor") + self.assertEqual(expected[index], key, + "expected value `%s' at %d but got `%s'" + % (expected[index], index, key)) + index = index + 1 + rec = curs.next() + self.assertEqual(index, len(expected), + "not enough values returned from cursor") + finally: + curs.close() + +class BtreeKeyCompareTestCase(AbstractBtreeKeyCompareTestCase) : + def runCompareTest(self, comparator, data) : + self.startTest() + self.createDB(comparator) + self.addDataToDB(data) + self.finishTest(data) + + def test_lexical_ordering(self) : + self.runCompareTest(lexical_cmp, _expected_lexical_test_data) + + def test_reverse_lexical_ordering(self) : + expected_rev_data = _expected_lexical_test_data[:] + expected_rev_data.reverse() + self.runCompareTest(make_reverse_comparator(lexical_cmp), + expected_rev_data) + + def test_compare_function_useless(self) : + self.startTest() + def socialist_comparator(l, r) : + return 0 + self.createDB(socialist_comparator) + self.addDataToDB(['b', 'a', 'd']) + # all things being equal the first key will be the only key + # in the database... (with the last key's value fwiw) + self.finishTest(['b']) + + +class BtreeExceptionsTestCase(AbstractBtreeKeyCompareTestCase) : + def test_raises_non_callable(self) : + self.startTest() + self.assertRaises(TypeError, self.createDB, 'abc') + self.assertRaises(TypeError, self.createDB, None) + self.finishTest() + + def test_set_bt_compare_with_function(self) : + self.startTest() + self.createDB(lexical_cmp) + self.finishTest() + + def check_results(self, results) : + pass + + def test_compare_function_incorrect(self) : + self.startTest() + def bad_comparator(l, r) : + return 1 + # verify that set_bt_compare checks that comparator('', '') == 0 + self.assertRaises(TypeError, self.createDB, bad_comparator) + self.finishTest() + + def verifyStderr(self, method, successRe) : + """ + Call method() while capturing sys.stderr output internally and + call self.fail() if successRe.search() does not match the stderr + output. This is used to test for uncatchable exceptions. + """ + stdErr = sys.stderr + sys.stderr = StringIO() + try: + method() + finally: + temp = sys.stderr + sys.stderr = stdErr + errorOut = temp.getvalue() + if not successRe.search(errorOut) : + self.fail("unexpected stderr output:\n"+errorOut) + if sys.version_info < (3, 0) : # XXX: How to do this in Py3k ??? + sys.exc_traceback = sys.last_traceback = None + + def _test_compare_function_exception(self) : + self.startTest() + def bad_comparator(l, r) : + if l == r: + # pass the set_bt_compare test + return 0 + raise RuntimeError, "i'm a naughty comparison function" + self.createDB(bad_comparator) + #print "\n*** test should print 2 uncatchable tracebacks ***" + self.addDataToDB(['a', 'b', 'c']) # this should raise, but... + self.finishTest() + + def test_compare_function_exception(self) : + self.verifyStderr( + self._test_compare_function_exception, + re.compile('(^RuntimeError:.* naughty.*){2}', re.M|re.S) + ) + + def _test_compare_function_bad_return(self) : + self.startTest() + def bad_comparator(l, r) : + if l == r: + # pass the set_bt_compare test + return 0 + return l + self.createDB(bad_comparator) + #print "\n*** test should print 2 errors about returning an int ***" + self.addDataToDB(['a', 'b', 'c']) # this should raise, but... + self.finishTest() + + def test_compare_function_bad_return(self) : + self.verifyStderr( + self._test_compare_function_bad_return, + re.compile('(^TypeError:.* return an int.*){2}', re.M|re.S) + ) + + + def test_cannot_assign_twice(self) : + + def my_compare(a, b) : + return 0 + + self.startTest() + self.createDB(my_compare) + self.assertRaises(RuntimeError, self.db.set_bt_compare, my_compare) + +class AbstractDuplicateCompareTestCase(unittest.TestCase) : + env = None + db = None + + if (sys.version_info < (2, 7)) or ((sys.version_info >= (3,0)) and + (sys.version_info < (3, 2))) : + def assertLess(self, a, b, msg=None) : + return self.assertTrue(a<b, msg=msg) + + def setUp(self) : + self.filename = self.__class__.__name__ + '.db' + self.homeDir = get_new_environment_path() + env = db.DBEnv() + env.open(self.homeDir, + db.DB_CREATE | db.DB_INIT_MPOOL + | db.DB_INIT_LOCK | db.DB_THREAD) + self.env = env + + def tearDown(self) : + self.closeDB() + if self.env is not None: + self.env.close() + self.env = None + test_support.rmtree(self.homeDir) + + def addDataToDB(self, data) : + for item in data: + self.db.put("key", item) + + def createDB(self, dup_comparator) : + self.db = db.DB(self.env) + self.setupDB(dup_comparator) + self.db.open(self.filename, "test", db.DB_BTREE, db.DB_CREATE) + + def setupDB(self, dup_comparator) : + self.db.set_flags(db.DB_DUPSORT) + self.db.set_dup_compare(dup_comparator) + + def closeDB(self) : + if self.db is not None: + self.db.close() + self.db = None + + def startTest(self) : + pass + + def finishTest(self, expected = None) : + if expected is not None: + self.check_results(expected) + self.closeDB() + + def check_results(self, expected) : + curs = self.db.cursor() + try: + index = 0 + rec = curs.first() + while rec: + ignore, data = rec + self.assertLess(index, len(expected), + "to many values returned from cursor") + self.assertEqual(expected[index], data, + "expected value `%s' at %d but got `%s'" + % (expected[index], index, data)) + index = index + 1 + rec = curs.next() + self.assertEqual(index, len(expected), + "not enough values returned from cursor") + finally: + curs.close() + +class DuplicateCompareTestCase(AbstractDuplicateCompareTestCase) : + def runCompareTest(self, comparator, data) : + self.startTest() + self.createDB(comparator) + self.addDataToDB(data) + self.finishTest(data) + + def test_lexical_ordering(self) : + self.runCompareTest(lexical_cmp, _expected_lexical_test_data) + + def test_reverse_lexical_ordering(self) : + expected_rev_data = _expected_lexical_test_data[:] + expected_rev_data.reverse() + self.runCompareTest(make_reverse_comparator(lexical_cmp), + expected_rev_data) + +class DuplicateExceptionsTestCase(AbstractDuplicateCompareTestCase) : + def test_raises_non_callable(self) : + self.startTest() + self.assertRaises(TypeError, self.createDB, 'abc') + self.assertRaises(TypeError, self.createDB, None) + self.finishTest() + + def test_set_dup_compare_with_function(self) : + self.startTest() + self.createDB(lexical_cmp) + self.finishTest() + + def check_results(self, results) : + pass + + def test_compare_function_incorrect(self) : + self.startTest() + def bad_comparator(l, r) : + return 1 + # verify that set_dup_compare checks that comparator('', '') == 0 + self.assertRaises(TypeError, self.createDB, bad_comparator) + self.finishTest() + + def test_compare_function_useless(self) : + self.startTest() + def socialist_comparator(l, r) : + return 0 + self.createDB(socialist_comparator) + # DUPSORT does not allow "duplicate duplicates" + self.assertRaises(db.DBKeyExistError, self.addDataToDB, ['b', 'a', 'd']) + self.finishTest() + + def verifyStderr(self, method, successRe) : + """ + Call method() while capturing sys.stderr output internally and + call self.fail() if successRe.search() does not match the stderr + output. This is used to test for uncatchable exceptions. + """ + stdErr = sys.stderr + sys.stderr = StringIO() + try: + method() + finally: + temp = sys.stderr + sys.stderr = stdErr + errorOut = temp.getvalue() + if not successRe.search(errorOut) : + self.fail("unexpected stderr output:\n"+errorOut) + if sys.version_info < (3, 0) : # XXX: How to do this in Py3k ??? + sys.exc_traceback = sys.last_traceback = None + + def _test_compare_function_exception(self) : + self.startTest() + def bad_comparator(l, r) : + if l == r: + # pass the set_dup_compare test + return 0 + raise RuntimeError, "i'm a naughty comparison function" + self.createDB(bad_comparator) + #print "\n*** test should print 2 uncatchable tracebacks ***" + self.addDataToDB(['a', 'b', 'c']) # this should raise, but... + self.finishTest() + + def test_compare_function_exception(self) : + self.verifyStderr( + self._test_compare_function_exception, + re.compile('(^RuntimeError:.* naughty.*){2}', re.M|re.S) + ) + + def _test_compare_function_bad_return(self) : + self.startTest() + def bad_comparator(l, r) : + if l == r: + # pass the set_dup_compare test + return 0 + return l + self.createDB(bad_comparator) + #print "\n*** test should print 2 errors about returning an int ***" + self.addDataToDB(['a', 'b', 'c']) # this should raise, but... + self.finishTest() + + def test_compare_function_bad_return(self) : + self.verifyStderr( + self._test_compare_function_bad_return, + re.compile('(^TypeError:.* return an int.*){2}', re.M|re.S) + ) + + + def test_cannot_assign_twice(self) : + + def my_compare(a, b) : + return 0 + + self.startTest() + self.createDB(my_compare) + self.assertRaises(RuntimeError, self.db.set_dup_compare, my_compare) + +def test_suite() : + res = unittest.TestSuite() + + res.addTest(unittest.makeSuite(ComparatorTests)) + res.addTest(unittest.makeSuite(BtreeExceptionsTestCase)) + res.addTest(unittest.makeSuite(BtreeKeyCompareTestCase)) + res.addTest(unittest.makeSuite(DuplicateExceptionsTestCase)) + res.addTest(unittest.makeSuite(DuplicateCompareTestCase)) + return res + +if __name__ == '__main__': + unittest.main(defaultTest = 'suite') |