diff options
Diffstat (limited to 'lib/python2.7/bsddb/test/test_associate.py')
-rw-r--r-- | lib/python2.7/bsddb/test/test_associate.py | 465 |
1 files changed, 465 insertions, 0 deletions
diff --git a/lib/python2.7/bsddb/test/test_associate.py b/lib/python2.7/bsddb/test/test_associate.py new file mode 100644 index 0000000..7a49e11 --- /dev/null +++ b/lib/python2.7/bsddb/test/test_associate.py @@ -0,0 +1,465 @@ +""" +TestCases for DB.associate. +""" + +import sys, os, string +import time +from pprint import pprint + +import unittest +from test_all import db, dbshelve, test_support, verbose, have_threads, \ + get_new_environment_path + + +#---------------------------------------------------------------------- + + +musicdata = { +1 : ("Bad English", "The Price Of Love", "Rock"), +2 : ("DNA featuring Suzanne Vega", "Tom's Diner", "Rock"), +3 : ("George Michael", "Praying For Time", "Rock"), +4 : ("Gloria Estefan", "Here We Are", "Rock"), +5 : ("Linda Ronstadt", "Don't Know Much", "Rock"), +6 : ("Michael Bolton", "How Am I Supposed To Live Without You", "Blues"), +7 : ("Paul Young", "Oh Girl", "Rock"), +8 : ("Paula Abdul", "Opposites Attract", "Rock"), +9 : ("Richard Marx", "Should've Known Better", "Rock"), +10: ("Rod Stewart", "Forever Young", "Rock"), +11: ("Roxette", "Dangerous", "Rock"), +12: ("Sheena Easton", "The Lover In Me", "Rock"), +13: ("Sinead O'Connor", "Nothing Compares 2 U", "Rock"), +14: ("Stevie B.", "Because I Love You", "Rock"), +15: ("Taylor Dayne", "Love Will Lead You Back", "Rock"), +16: ("The Bangles", "Eternal Flame", "Rock"), +17: ("Wilson Phillips", "Release Me", "Rock"), +18: ("Billy Joel", "Blonde Over Blue", "Rock"), +19: ("Billy Joel", "Famous Last Words", "Rock"), +20: ("Billy Joel", "Lullabye (Goodnight, My Angel)", "Rock"), +21: ("Billy Joel", "The River Of Dreams", "Rock"), +22: ("Billy Joel", "Two Thousand Years", "Rock"), +23: ("Janet Jackson", "Alright", "Rock"), +24: ("Janet Jackson", "Black Cat", "Rock"), +25: ("Janet Jackson", "Come Back To Me", "Rock"), +26: ("Janet Jackson", "Escapade", "Rock"), +27: ("Janet Jackson", "Love Will Never Do (Without You)", "Rock"), +28: ("Janet Jackson", "Miss You Much", "Rock"), +29: ("Janet Jackson", "Rhythm Nation", "Rock"), +30: ("Janet Jackson", "State Of The World", "Rock"), +31: ("Janet Jackson", "The Knowledge", "Rock"), +32: ("Spyro Gyra", "End of Romanticism", "Jazz"), +33: ("Spyro Gyra", "Heliopolis", "Jazz"), +34: ("Spyro Gyra", "Jubilee", "Jazz"), +35: ("Spyro Gyra", "Little Linda", "Jazz"), +36: ("Spyro Gyra", "Morning Dance", "Jazz"), +37: ("Spyro Gyra", "Song for Lorraine", "Jazz"), +38: ("Yes", "Owner Of A Lonely Heart", "Rock"), +39: ("Yes", "Rhythm Of Love", "Rock"), +40: ("Cusco", "Dream Catcher", "New Age"), +41: ("Cusco", "Geronimos Laughter", "New Age"), +42: ("Cusco", "Ghost Dance", "New Age"), +43: ("Blue Man Group", "Drumbone", "New Age"), +44: ("Blue Man Group", "Endless Column", "New Age"), +45: ("Blue Man Group", "Klein Mandelbrot", "New Age"), +46: ("Kenny G", "Silhouette", "Jazz"), +47: ("Sade", "Smooth Operator", "Jazz"), +48: ("David Arkenstone", "Papillon (On The Wings Of The Butterfly)", + "New Age"), +49: ("David Arkenstone", "Stepping Stars", "New Age"), +50: ("David Arkenstone", "Carnation Lily Lily Rose", "New Age"), +51: ("David Lanz", "Behind The Waterfall", "New Age"), +52: ("David Lanz", "Cristofori's Dream", "New Age"), +53: ("David Lanz", "Heartsounds", "New Age"), +54: ("David Lanz", "Leaves on the Seine", "New Age"), +99: ("unknown artist", "Unnamed song", "Unknown"), +} + +#---------------------------------------------------------------------- + +class AssociateErrorTestCase(unittest.TestCase): + def setUp(self): + self.filename = self.__class__.__name__ + '.db' + self.homeDir = get_new_environment_path() + self.env = db.DBEnv() + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL) + + def tearDown(self): + self.env.close() + self.env = None + test_support.rmtree(self.homeDir) + + def test00_associateDBError(self): + if verbose: + print '\n', '-=' * 30 + print "Running %s.test00_associateDBError..." % \ + self.__class__.__name__ + + dupDB = db.DB(self.env) + dupDB.set_flags(db.DB_DUP) + dupDB.open(self.filename, "primary", db.DB_BTREE, db.DB_CREATE) + + secDB = db.DB(self.env) + secDB.open(self.filename, "secondary", db.DB_BTREE, db.DB_CREATE) + + # dupDB has been configured to allow duplicates, it can't + # associate with a secondary. Berkeley DB will return an error. + try: + def f(a,b): return a+b + dupDB.associate(secDB, f) + except db.DBError: + # good + secDB.close() + dupDB.close() + else: + secDB.close() + dupDB.close() + self.fail("DBError exception was expected") + + + +#---------------------------------------------------------------------- + + +class AssociateTestCase(unittest.TestCase): + keytype = '' + envFlags = 0 + dbFlags = 0 + + def setUp(self): + self.filename = self.__class__.__name__ + '.db' + self.homeDir = get_new_environment_path() + self.env = db.DBEnv() + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | + db.DB_INIT_LOCK | db.DB_THREAD | self.envFlags) + + def tearDown(self): + self.closeDB() + self.env.close() + self.env = None + test_support.rmtree(self.homeDir) + + def addDataToDB(self, d, txn=None): + for key, value in musicdata.items(): + if type(self.keytype) == type(''): + key = "%02d" % key + d.put(key, '|'.join(value), txn=txn) + + def createDB(self, txn=None): + self.cur = None + self.secDB = None + self.primary = db.DB(self.env) + self.primary.set_get_returns_none(2) + self.primary.open(self.filename, "primary", self.dbtype, + db.DB_CREATE | db.DB_THREAD | self.dbFlags, txn=txn) + + def closeDB(self): + if self.cur: + self.cur.close() + self.cur = None + if self.secDB: + self.secDB.close() + self.secDB = None + self.primary.close() + self.primary = None + + def getDB(self): + return self.primary + + + def _associateWithDB(self, getGenre): + self.createDB() + + self.secDB = db.DB(self.env) + self.secDB.set_flags(db.DB_DUP) + self.secDB.set_get_returns_none(2) + self.secDB.open(self.filename, "secondary", db.DB_BTREE, + db.DB_CREATE | db.DB_THREAD | self.dbFlags) + self.getDB().associate(self.secDB, getGenre) + + self.addDataToDB(self.getDB()) + + self.finish_test(self.secDB) + + def test01_associateWithDB(self): + if verbose: + print '\n', '-=' * 30 + print "Running %s.test01_associateWithDB..." % \ + self.__class__.__name__ + + return self._associateWithDB(self.getGenre) + + def _associateAfterDB(self, getGenre) : + self.createDB() + self.addDataToDB(self.getDB()) + + self.secDB = db.DB(self.env) + self.secDB.set_flags(db.DB_DUP) + self.secDB.open(self.filename, "secondary", db.DB_BTREE, + db.DB_CREATE | db.DB_THREAD | self.dbFlags) + + # adding the DB_CREATE flag will cause it to index existing records + self.getDB().associate(self.secDB, getGenre, db.DB_CREATE) + + self.finish_test(self.secDB) + + def test02_associateAfterDB(self): + if verbose: + print '\n', '-=' * 30 + print "Running %s.test02_associateAfterDB..." % \ + self.__class__.__name__ + + return self._associateAfterDB(self.getGenre) + + if db.version() >= (4, 6): + def test03_associateWithDB(self): + if verbose: + print '\n', '-=' * 30 + print "Running %s.test03_associateWithDB..." % \ + self.__class__.__name__ + + return self._associateWithDB(self.getGenreList) + + def test04_associateAfterDB(self): + if verbose: + print '\n', '-=' * 30 + print "Running %s.test04_associateAfterDB..." % \ + self.__class__.__name__ + + return self._associateAfterDB(self.getGenreList) + + + def finish_test(self, secDB, txn=None): + # 'Blues' should not be in the secondary database + vals = secDB.pget('Blues', txn=txn) + self.assertEqual(vals, None, vals) + + vals = secDB.pget('Unknown', txn=txn) + self.assertTrue(vals[0] == 99 or vals[0] == '99', vals) + vals[1].index('Unknown') + vals[1].index('Unnamed') + vals[1].index('unknown') + + if verbose: + print "Primary key traversal:" + self.cur = self.getDB().cursor(txn) + count = 0 + rec = self.cur.first() + while rec is not None: + if type(self.keytype) == type(''): + self.assertTrue(int(rec[0])) # for primary db, key is a number + else: + self.assertTrue(rec[0] and type(rec[0]) == type(0)) + count = count + 1 + if verbose: + print rec + rec = getattr(self.cur, "next")() + self.assertEqual(count, len(musicdata)) # all items accounted for + + + if verbose: + print "Secondary key traversal:" + self.cur = secDB.cursor(txn) + count = 0 + + # test cursor pget + vals = self.cur.pget('Unknown', flags=db.DB_LAST) + self.assertTrue(vals[1] == 99 or vals[1] == '99', vals) + self.assertEqual(vals[0], 'Unknown') + vals[2].index('Unknown') + vals[2].index('Unnamed') + vals[2].index('unknown') + + vals = self.cur.pget('Unknown', data='wrong value', flags=db.DB_GET_BOTH) + self.assertEqual(vals, None, vals) + + rec = self.cur.first() + self.assertEqual(rec[0], "Jazz") + while rec is not None: + count = count + 1 + if verbose: + print rec + rec = getattr(self.cur, "next")() + # all items accounted for EXCEPT for 1 with "Blues" genre + self.assertEqual(count, len(musicdata)-1) + + self.cur = None + + def getGenre(self, priKey, priData): + self.assertEqual(type(priData), type("")) + genre = priData.split('|')[2] + + if verbose: + print 'getGenre key: %r data: %r' % (priKey, priData) + + if genre == 'Blues': + return db.DB_DONOTINDEX + else: + return genre + + def getGenreList(self, priKey, PriData) : + v = self.getGenre(priKey, PriData) + if type(v) == type("") : + v = [v] + return v + + +#---------------------------------------------------------------------- + + +class AssociateHashTestCase(AssociateTestCase): + dbtype = db.DB_HASH + +class AssociateBTreeTestCase(AssociateTestCase): + dbtype = db.DB_BTREE + +class AssociateRecnoTestCase(AssociateTestCase): + dbtype = db.DB_RECNO + keytype = 0 + +#---------------------------------------------------------------------- + +class AssociateBTreeTxnTestCase(AssociateBTreeTestCase): + envFlags = db.DB_INIT_TXN + dbFlags = 0 + + def txn_finish_test(self, sDB, txn): + try: + self.finish_test(sDB, txn=txn) + finally: + if self.cur: + self.cur.close() + self.cur = None + if txn: + txn.commit() + + def test13_associate_in_transaction(self): + if verbose: + print '\n', '-=' * 30 + print "Running %s.test13_associateAutoCommit..." % \ + self.__class__.__name__ + + txn = self.env.txn_begin() + try: + self.createDB(txn=txn) + + self.secDB = db.DB(self.env) + self.secDB.set_flags(db.DB_DUP) + self.secDB.set_get_returns_none(2) + self.secDB.open(self.filename, "secondary", db.DB_BTREE, + db.DB_CREATE | db.DB_THREAD, txn=txn) + self.getDB().associate(self.secDB, self.getGenre, txn=txn) + + self.addDataToDB(self.getDB(), txn=txn) + except: + txn.abort() + raise + + self.txn_finish_test(self.secDB, txn=txn) + + +#---------------------------------------------------------------------- + +class ShelveAssociateTestCase(AssociateTestCase): + + def createDB(self): + self.primary = dbshelve.open(self.filename, + dbname="primary", + dbenv=self.env, + filetype=self.dbtype) + + def addDataToDB(self, d): + for key, value in musicdata.items(): + if type(self.keytype) == type(''): + key = "%02d" % key + d.put(key, value) # save the value as is this time + + + def getGenre(self, priKey, priData): + self.assertEqual(type(priData), type(())) + if verbose: + print 'getGenre key: %r data: %r' % (priKey, priData) + genre = priData[2] + if genre == 'Blues': + return db.DB_DONOTINDEX + else: + return genre + + +class ShelveAssociateHashTestCase(ShelveAssociateTestCase): + dbtype = db.DB_HASH + +class ShelveAssociateBTreeTestCase(ShelveAssociateTestCase): + dbtype = db.DB_BTREE + +class ShelveAssociateRecnoTestCase(ShelveAssociateTestCase): + dbtype = db.DB_RECNO + keytype = 0 + + +#---------------------------------------------------------------------- + +class ThreadedAssociateTestCase(AssociateTestCase): + + def addDataToDB(self, d): + t1 = Thread(target = self.writer1, + args = (d, )) + t2 = Thread(target = self.writer2, + args = (d, )) + + t1.setDaemon(True) + t2.setDaemon(True) + t1.start() + t2.start() + t1.join() + t2.join() + + def writer1(self, d): + for key, value in musicdata.items(): + if type(self.keytype) == type(''): + key = "%02d" % key + d.put(key, '|'.join(value)) + + def writer2(self, d): + for x in range(100, 600): + key = 'z%2d' % x + value = [key] * 4 + d.put(key, '|'.join(value)) + + +class ThreadedAssociateHashTestCase(ShelveAssociateTestCase): + dbtype = db.DB_HASH + +class ThreadedAssociateBTreeTestCase(ShelveAssociateTestCase): + dbtype = db.DB_BTREE + +class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase): + dbtype = db.DB_RECNO + keytype = 0 + + +#---------------------------------------------------------------------- + +def test_suite(): + suite = unittest.TestSuite() + + suite.addTest(unittest.makeSuite(AssociateErrorTestCase)) + + suite.addTest(unittest.makeSuite(AssociateHashTestCase)) + suite.addTest(unittest.makeSuite(AssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(AssociateRecnoTestCase)) + + suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase)) + + suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase)) + + if have_threads: + suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase)) + suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase)) + + return suite + + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') |