diff options
author | rahulp13 | 2020-03-17 14:55:41 +0530 |
---|---|---|
committer | rahulp13 | 2020-03-17 14:55:41 +0530 |
commit | 296443137f4288cb030e92859ccfbe3204bc1088 (patch) | |
tree | ca4798c2da1e7244edc3bc108d81b462b537aea2 /lib/python2.7/bsddb/test/test_replication.py | |
parent | 0db48f6533517ecebfd9f0693f89deca28408b76 (diff) | |
download | KiCad-eSim-296443137f4288cb030e92859ccfbe3204bc1088.tar.gz KiCad-eSim-296443137f4288cb030e92859ccfbe3204bc1088.tar.bz2 KiCad-eSim-296443137f4288cb030e92859ccfbe3204bc1088.zip |
initial commit
Diffstat (limited to 'lib/python2.7/bsddb/test/test_replication.py')
-rw-r--r-- | lib/python2.7/bsddb/test/test_replication.py | 543 |
1 files changed, 543 insertions, 0 deletions
diff --git a/lib/python2.7/bsddb/test/test_replication.py b/lib/python2.7/bsddb/test/test_replication.py new file mode 100644 index 0000000..12ab2dd --- /dev/null +++ b/lib/python2.7/bsddb/test/test_replication.py @@ -0,0 +1,543 @@ +"""TestCases for distributed transactions. +""" + +import os +import time +import unittest + +from test_all import db, test_support, have_threads, verbose, \ + get_new_environment_path, get_new_database_path + + +#---------------------------------------------------------------------- + +class DBReplication(unittest.TestCase) : + def setUp(self) : + self.homeDirMaster = get_new_environment_path() + self.homeDirClient = get_new_environment_path() + + self.dbenvMaster = db.DBEnv() + self.dbenvClient = db.DBEnv() + + # Must use "DB_THREAD" because the Replication Manager will + # be executed in other threads but will use the same environment. + # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0 + self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN + | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | + db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666) + self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN + | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | + db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666) + + self.confirmed_master=self.client_startupdone=False + def confirmed_master(a,b,c) : + if b==db.DB_EVENT_REP_MASTER : + self.confirmed_master=True + + def client_startupdone(a,b,c) : + if b==db.DB_EVENT_REP_STARTUPDONE : + self.client_startupdone=True + + self.dbenvMaster.set_event_notify(confirmed_master) + self.dbenvClient.set_event_notify(client_startupdone) + + #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) + #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) + #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) + #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) + + self.dbMaster = self.dbClient = None + + + def tearDown(self): + if self.dbClient : + self.dbClient.close() + if self.dbMaster : + self.dbMaster.close() + + # Here we assign dummy event handlers to allow GC of the test object. + # Since the dummy handler doesn't use any outer scope variable, it + # doesn't keep any reference to the test object. + def dummy(*args) : + pass + self.dbenvMaster.set_event_notify(dummy) + self.dbenvClient.set_event_notify(dummy) + + self.dbenvClient.close() + self.dbenvMaster.close() + test_support.rmtree(self.homeDirClient) + test_support.rmtree(self.homeDirMaster) + +class DBReplicationManager(DBReplication) : + def test01_basic_replication(self) : + master_port = test_support.find_unused_port() + client_port = test_support.find_unused_port() + if db.version() >= (5, 2) : + self.site = self.dbenvMaster.repmgr_site("127.0.0.1", master_port) + self.site.set_config(db.DB_GROUP_CREATOR, True) + self.site.set_config(db.DB_LOCAL_SITE, True) + self.site2 = self.dbenvMaster.repmgr_site("127.0.0.1", client_port) + + self.site3 = self.dbenvClient.repmgr_site("127.0.0.1", master_port) + self.site3.set_config(db.DB_BOOTSTRAP_HELPER, True) + self.site4 = self.dbenvClient.repmgr_site("127.0.0.1", client_port) + self.site4.set_config(db.DB_LOCAL_SITE, True) + + d = { + db.DB_BOOTSTRAP_HELPER: [False, False, True, False], + db.DB_GROUP_CREATOR: [True, False, False, False], + db.DB_LEGACY: [False, False, False, False], + db.DB_LOCAL_SITE: [True, False, False, True], + db.DB_REPMGR_PEER: [False, False, False, False ], + } + + for i, j in d.items() : + for k, v in \ + zip([self.site, self.site2, self.site3, self.site4], j) : + if v : + self.assertTrue(k.get_config(i)) + else : + self.assertFalse(k.get_config(i)) + + self.assertNotEqual(self.site.get_eid(), self.site2.get_eid()) + self.assertNotEqual(self.site3.get_eid(), self.site4.get_eid()) + + for i, j in zip([self.site, self.site2, self.site3, self.site4], \ + [master_port, client_port, master_port, client_port]) : + addr = i.get_address() + self.assertEqual(addr, ("127.0.0.1", j)) + + for i in [self.site, self.site2] : + self.assertEqual(i.get_address(), + self.dbenvMaster.repmgr_site_by_eid(i.get_eid()).get_address()) + for i in [self.site3, self.site4] : + self.assertEqual(i.get_address(), + self.dbenvClient.repmgr_site_by_eid(i.get_eid()).get_address()) + else : + self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port) + self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port) + self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port) + self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port) + + self.dbenvMaster.rep_set_nsites(2) + self.dbenvClient.rep_set_nsites(2) + + self.dbenvMaster.rep_set_priority(10) + self.dbenvClient.rep_set_priority(0) + + self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123) + self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321) + self.assertEqual(self.dbenvMaster.rep_get_timeout( + db.DB_REP_CONNECTION_RETRY), 100123) + self.assertEqual(self.dbenvClient.rep_get_timeout( + db.DB_REP_CONNECTION_RETRY), 100321) + + self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234) + self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432) + self.assertEqual(self.dbenvMaster.rep_get_timeout( + db.DB_REP_ELECTION_TIMEOUT), 100234) + self.assertEqual(self.dbenvClient.rep_get_timeout( + db.DB_REP_ELECTION_TIMEOUT), 100432) + + self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345) + self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543) + self.assertEqual(self.dbenvMaster.rep_get_timeout( + db.DB_REP_ELECTION_RETRY), 100345) + self.assertEqual(self.dbenvClient.rep_get_timeout( + db.DB_REP_ELECTION_RETRY), 100543) + + self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) + self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) + + self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER); + self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT); + + self.assertEqual(self.dbenvMaster.rep_get_nsites(),2) + self.assertEqual(self.dbenvClient.rep_get_nsites(),2) + self.assertEqual(self.dbenvMaster.rep_get_priority(),10) + self.assertEqual(self.dbenvClient.rep_get_priority(),0) + self.assertEqual(self.dbenvMaster.repmgr_get_ack_policy(), + db.DB_REPMGR_ACKS_ALL) + self.assertEqual(self.dbenvClient.repmgr_get_ack_policy(), + db.DB_REPMGR_ACKS_ALL) + + # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE + # is not generated if the master has no new transactions. + # This is solved in BDB 4.6 (#15542). + import time + timeout = time.time()+60 + while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) : + time.sleep(0.02) + # self.client_startupdone does not always get set to True within + # the timeout. On windows this may be a deep issue, on other + # platforms it is likely just a timing issue, especially on slow + # virthost buildbots (see issue 3892 for more). Even though + # the timeout triggers, the rest of this test method usually passes + # (but not all of it always, see below). So we just note the + # timeout on stderr and keep soldering on. + if time.time()>timeout: + import sys + print >> sys.stderr, ("XXX: timeout happened before" + "startup was confirmed - see issue 3892") + startup_timeout = True + + d = self.dbenvMaster.repmgr_site_list() + self.assertEqual(len(d), 1) + d = d.values()[0] # There is only one + self.assertEqual(d[0], "127.0.0.1") + self.assertEqual(d[1], client_port) + self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \ + (d[2]==db.DB_REPMGR_DISCONNECTED)) + + d = self.dbenvClient.repmgr_site_list() + self.assertEqual(len(d), 1) + d = d.values()[0] # There is only one + self.assertEqual(d[0], "127.0.0.1") + self.assertEqual(d[1], master_port) + self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \ + (d[2]==db.DB_REPMGR_DISCONNECTED)) + + if db.version() >= (4,6) : + d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR); + self.assertTrue("msgs_queued" in d) + + self.dbMaster=db.DB(self.dbenvMaster) + txn=self.dbenvMaster.txn_begin() + self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) + txn.commit() + + import time,os.path + timeout=time.time()+10 + while (time.time()<timeout) and \ + not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : + time.sleep(0.01) + + self.dbClient=db.DB(self.dbenvClient) + while True : + txn=self.dbenvClient.txn_begin() + try : + self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, + mode=0666, txn=txn) + except db.DBRepHandleDeadError : + txn.abort() + self.dbClient.close() + self.dbClient=db.DB(self.dbenvClient) + continue + + txn.commit() + break + + txn=self.dbenvMaster.txn_begin() + self.dbMaster.put("ABC", "123", txn=txn) + txn.commit() + import time + timeout=time.time()+10 + v=None + while (time.time()<timeout) and (v is None) : + txn=self.dbenvClient.txn_begin() + v=self.dbClient.get("ABC", txn=txn) + txn.commit() + if v is None : + time.sleep(0.02) + # If startup did not happen before the timeout above, then this test + # sometimes fails. This happens randomly, which causes buildbot + # instability, but all the other bsddb tests pass. Since bsddb3 in the + # stdlib is currently not getting active maintenance, and is gone in + # py3k, we just skip the end of the test in that case. + if time.time()>=timeout and startup_timeout: + self.skipTest("replication test skipped due to random failure, " + "see issue 3892") + self.assertTrue(time.time()<timeout) + self.assertEqual("123", v) + + txn=self.dbenvMaster.txn_begin() + self.dbMaster.delete("ABC", txn=txn) + txn.commit() + timeout=time.time()+10 + while (time.time()<timeout) and (v is not None) : + txn=self.dbenvClient.txn_begin() + v=self.dbClient.get("ABC", txn=txn) + txn.commit() + if v is None : + time.sleep(0.02) + self.assertTrue(time.time()<timeout) + self.assertEqual(None, v) + +class DBBaseReplication(DBReplication) : + def setUp(self) : + DBReplication.setUp(self) + def confirmed_master(a,b,c) : + if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) : + self.confirmed_master = True + + def client_startupdone(a,b,c) : + if b == db.DB_EVENT_REP_STARTUPDONE : + self.client_startupdone = True + + self.dbenvMaster.set_event_notify(confirmed_master) + self.dbenvClient.set_event_notify(client_startupdone) + + import Queue + self.m2c = Queue.Queue() + self.c2m = Queue.Queue() + + # There are only two nodes, so we don't need to + # do any routing decision + def m2c(dbenv, control, rec, lsnp, envid, flags) : + self.m2c.put((control, rec)) + + def c2m(dbenv, control, rec, lsnp, envid, flags) : + self.c2m.put((control, rec)) + + self.dbenvMaster.rep_set_transport(13,m2c) + self.dbenvMaster.rep_set_priority(10) + self.dbenvClient.rep_set_transport(3,c2m) + self.dbenvClient.rep_set_priority(0) + + self.assertEqual(self.dbenvMaster.rep_get_priority(),10) + self.assertEqual(self.dbenvClient.rep_get_priority(),0) + + #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) + #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) + #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) + #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) + + def thread_master() : + return self.thread_do(self.dbenvMaster, self.c2m, 3, + self.master_doing_election, True) + + def thread_client() : + return self.thread_do(self.dbenvClient, self.m2c, 13, + self.client_doing_election, False) + + from threading import Thread + t_m=Thread(target=thread_master) + t_c=Thread(target=thread_client) + import sys + if sys.version_info[0] < 3 : + t_m.setDaemon(True) + t_c.setDaemon(True) + else : + t_m.daemon = True + t_c.daemon = True + + self.t_m = t_m + self.t_c = t_c + + self.dbMaster = self.dbClient = None + + self.master_doing_election=[False] + self.client_doing_election=[False] + + + def tearDown(self): + if self.dbClient : + self.dbClient.close() + if self.dbMaster : + self.dbMaster.close() + self.m2c.put(None) + self.c2m.put(None) + self.t_m.join() + self.t_c.join() + + # Here we assign dummy event handlers to allow GC of the test object. + # Since the dummy handler doesn't use any outer scope variable, it + # doesn't keep any reference to the test object. + def dummy(*args) : + pass + self.dbenvMaster.set_event_notify(dummy) + self.dbenvClient.set_event_notify(dummy) + self.dbenvMaster.rep_set_transport(13,dummy) + self.dbenvClient.rep_set_transport(3,dummy) + + self.dbenvClient.close() + self.dbenvMaster.close() + test_support.rmtree(self.homeDirClient) + test_support.rmtree(self.homeDirMaster) + + def basic_rep_threading(self) : + self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) + self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) + + def thread_do(env, q, envid, election_status, must_be_master) : + while True : + v=q.get() + if v is None : return + env.rep_process_message(v[0], v[1], envid) + + self.thread_do = thread_do + + self.t_m.start() + self.t_c.start() + + def test01_basic_replication(self) : + self.basic_rep_threading() + + # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE + # is not generated if the master has no new transactions. + # This is solved in BDB 4.6 (#15542). + import time + timeout = time.time()+60 + while (time.time()<timeout) and not (self.confirmed_master and + self.client_startupdone) : + time.sleep(0.02) + self.assertTrue(time.time()<timeout) + + self.dbMaster=db.DB(self.dbenvMaster) + txn=self.dbenvMaster.txn_begin() + self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) + txn.commit() + + import time,os.path + timeout=time.time()+10 + while (time.time()<timeout) and \ + not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : + time.sleep(0.01) + + self.dbClient=db.DB(self.dbenvClient) + while True : + txn=self.dbenvClient.txn_begin() + try : + self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, + mode=0666, txn=txn) + except db.DBRepHandleDeadError : + txn.abort() + self.dbClient.close() + self.dbClient=db.DB(self.dbenvClient) + continue + + txn.commit() + break + + d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR); + self.assertTrue("master_changes" in d) + + txn=self.dbenvMaster.txn_begin() + self.dbMaster.put("ABC", "123", txn=txn) + txn.commit() + import time + timeout=time.time()+10 + v=None + while (time.time()<timeout) and (v is None) : + txn=self.dbenvClient.txn_begin() + v=self.dbClient.get("ABC", txn=txn) + txn.commit() + if v is None : + time.sleep(0.02) + self.assertTrue(time.time()<timeout) + self.assertEqual("123", v) + + txn=self.dbenvMaster.txn_begin() + self.dbMaster.delete("ABC", txn=txn) + txn.commit() + timeout=time.time()+10 + while (time.time()<timeout) and (v is not None) : + txn=self.dbenvClient.txn_begin() + v=self.dbClient.get("ABC", txn=txn) + txn.commit() + if v is None : + time.sleep(0.02) + self.assertTrue(time.time()<timeout) + self.assertEqual(None, v) + + if db.version() >= (4,7) : + def test02_test_request(self) : + self.basic_rep_threading() + (minimum, maximum) = self.dbenvClient.rep_get_request() + self.dbenvClient.rep_set_request(minimum-1, maximum+1) + self.assertEqual(self.dbenvClient.rep_get_request(), + (minimum-1, maximum+1)) + + if db.version() >= (4,6) : + def test03_master_election(self) : + # Get ready to hold an election + #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) + self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT) + self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) + + def thread_do(env, q, envid, election_status, must_be_master) : + while True : + v=q.get() + if v is None : return + r = env.rep_process_message(v[0],v[1],envid) + if must_be_master and self.confirmed_master : + self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER) + must_be_master = False + + if r[0] == db.DB_REP_HOLDELECTION : + def elect() : + while True : + try : + env.rep_elect(2, 1) + election_status[0] = False + break + except db.DBRepUnavailError : + pass + if not election_status[0] and not self.confirmed_master : + from threading import Thread + election_status[0] = True + t=Thread(target=elect) + import sys + if sys.version_info[0] < 3 : + t.setDaemon(True) + else : + t.daemon = True + t.start() + + self.thread_do = thread_do + + self.t_m.start() + self.t_c.start() + + self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) + self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) + self.client_doing_election[0] = True + while True : + try : + self.dbenvClient.rep_elect(2, 1) + self.client_doing_election[0] = False + break + except db.DBRepUnavailError : + pass + + self.assertTrue(self.confirmed_master) + + # Race condition showed up after upgrading to Solaris 10 Update 10 + # https://forums.oracle.com/forums/thread.jspa?messageID=9902860 + # jcea@jcea.es: See private email from Paula Bingham (Oracle), + # in 20110929. + while not (self.dbenvClient.rep_stat()["startup_complete"]) : + pass + + if db.version() >= (4,7) : + def test04_test_clockskew(self) : + fast, slow = 1234, 1230 + self.dbenvMaster.rep_set_clockskew(fast, slow) + self.assertEqual((fast, slow), + self.dbenvMaster.rep_get_clockskew()) + self.basic_rep_threading() + +#---------------------------------------------------------------------- + +def test_suite(): + suite = unittest.TestSuite() + if db.version() >= (4, 6) : + dbenv = db.DBEnv() + try : + dbenv.repmgr_get_ack_policy() + ReplicationManager_available=True + except : + ReplicationManager_available=False + dbenv.close() + del dbenv + if ReplicationManager_available : + suite.addTest(unittest.makeSuite(DBReplicationManager)) + + if have_threads : + suite.addTest(unittest.makeSuite(DBBaseReplication)) + + return suite + + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') |