summaryrefslogtreecommitdiff
path: root/lib/python2.7/bsddb/test/test_distributed_transactions.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/bsddb/test/test_distributed_transactions.py')
-rw-r--r--lib/python2.7/bsddb/test/test_distributed_transactions.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/lib/python2.7/bsddb/test/test_distributed_transactions.py b/lib/python2.7/bsddb/test/test_distributed_transactions.py
new file mode 100644
index 0000000..9058575
--- /dev/null
+++ b/lib/python2.7/bsddb/test/test_distributed_transactions.py
@@ -0,0 +1,152 @@
+"""TestCases for distributed transactions.
+"""
+
+import os
+import unittest
+
+from test_all import db, test_support, get_new_environment_path, \
+ get_new_database_path
+
+from test_all import verbose
+
+#----------------------------------------------------------------------
+
+class DBTxn_distributed(unittest.TestCase):
+ num_txns=1234
+ nosync=True
+ must_open_db=False
+ def _create_env(self, must_open_db) :
+ self.dbenv = db.DBEnv()
+ self.dbenv.set_tx_max(self.num_txns)
+ self.dbenv.set_lk_max_lockers(self.num_txns*2)
+ self.dbenv.set_lk_max_locks(self.num_txns*2)
+ self.dbenv.set_lk_max_objects(self.num_txns*2)
+ if self.nosync :
+ self.dbenv.set_flags(db.DB_TXN_NOSYNC,True)
+ self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_THREAD |
+ db.DB_RECOVER |
+ db.DB_INIT_TXN | db.DB_INIT_LOG | db.DB_INIT_MPOOL |
+ db.DB_INIT_LOCK, 0666)
+ self.db = db.DB(self.dbenv)
+ self.db.set_re_len(db.DB_GID_SIZE)
+ if must_open_db :
+ txn=self.dbenv.txn_begin()
+ self.db.open(self.filename,
+ db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666,
+ txn=txn)
+ txn.commit()
+
+ def setUp(self) :
+ self.homeDir = get_new_environment_path()
+ self.filename = "test"
+ return self._create_env(must_open_db=True)
+
+ def _destroy_env(self):
+ if self.nosync or (db.version()[:2] == (4,6)): # Known bug
+ self.dbenv.log_flush()
+ self.db.close()
+ self.dbenv.close()
+
+ def tearDown(self):
+ self._destroy_env()
+ test_support.rmtree(self.homeDir)
+
+ def _recreate_env(self,must_open_db) :
+ self._destroy_env()
+ self._create_env(must_open_db)
+
+ def test01_distributed_transactions(self) :
+ txns=set()
+ adapt = lambda x : x
+ import sys
+ if sys.version_info[0] >= 3 :
+ adapt = lambda x : bytes(x, "ascii")
+ # Create transactions, "prepare" them, and
+ # let them be garbage collected.
+ for i in xrange(self.num_txns) :
+ txn = self.dbenv.txn_begin()
+ gid = "%%%dd" %db.DB_GID_SIZE
+ gid = adapt(gid %i)
+ self.db.put(i, gid, txn=txn, flags=db.DB_APPEND)
+ txns.add(gid)
+ txn.prepare(gid)
+ del txn
+
+ self._recreate_env(self.must_open_db)
+
+ # Get "to be recovered" transactions but
+ # let them be garbage collected.
+ recovered_txns=self.dbenv.txn_recover()
+ self.assertEqual(self.num_txns,len(recovered_txns))
+ for gid,txn in recovered_txns :
+ self.assertTrue(gid in txns)
+ del txn
+ del recovered_txns
+
+ self._recreate_env(self.must_open_db)
+
+ # Get "to be recovered" transactions. Commit, abort and
+ # discard them.
+ recovered_txns=self.dbenv.txn_recover()
+ self.assertEqual(self.num_txns,len(recovered_txns))
+ discard_txns=set()
+ committed_txns=set()
+ state=0
+ for gid,txn in recovered_txns :
+ if state==0 or state==1:
+ committed_txns.add(gid)
+ txn.commit()
+ elif state==2 :
+ txn.abort()
+ elif state==3 :
+ txn.discard()
+ discard_txns.add(gid)
+ state=-1
+ state+=1
+ del txn
+ del recovered_txns
+
+ self._recreate_env(self.must_open_db)
+
+ # Verify the discarded transactions are still
+ # around, and dispose them.
+ recovered_txns=self.dbenv.txn_recover()
+ self.assertEqual(len(discard_txns),len(recovered_txns))
+ for gid,txn in recovered_txns :
+ txn.abort()
+ del txn
+ del recovered_txns
+
+ self._recreate_env(must_open_db=True)
+
+ # Be sure there are not pending transactions.
+ # Check also database size.
+ recovered_txns=self.dbenv.txn_recover()
+ self.assertTrue(len(recovered_txns)==0)
+ self.assertEqual(len(committed_txns),self.db.stat()["nkeys"])
+
+class DBTxn_distributedSYNC(DBTxn_distributed):
+ nosync=False
+
+class DBTxn_distributed_must_open_db(DBTxn_distributed):
+ must_open_db=True
+
+class DBTxn_distributedSYNC_must_open_db(DBTxn_distributed):
+ nosync=False
+ must_open_db=True
+
+#----------------------------------------------------------------------
+
+def test_suite():
+ suite = unittest.TestSuite()
+ if db.version() >= (4,5) :
+ suite.addTest(unittest.makeSuite(DBTxn_distributed))
+ suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC))
+ if db.version() >= (4,6) :
+ suite.addTest(unittest.makeSuite(DBTxn_distributed_must_open_db))
+ suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC_must_open_db))
+ return suite
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')