symbian-qemu-0.9.1-12/python-2.6.1/Lib/bsddb/test/test_replication.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """TestCases for distributed transactions.
       
     2 """
       
     3 
       
     4 import os
       
     5 import time
       
     6 import unittest
       
     7 
       
     8 from test_all import db, test_support, have_threads, verbose, \
       
     9         get_new_environment_path, get_new_database_path
       
    10 
       
    11 
       
    12 #----------------------------------------------------------------------
       
    13 
       
    14 class DBReplicationManager(unittest.TestCase):
       
    15     import sys
       
    16     if sys.version_info[:3] < (2, 4, 0):
       
    17         def assertTrue(self, expr, msg=None):
       
    18             self.failUnless(expr,msg=msg)
       
    19 
       
    20     def setUp(self) :
       
    21         self.homeDirMaster = get_new_environment_path()
       
    22         self.homeDirClient = get_new_environment_path()
       
    23 
       
    24         self.dbenvMaster = db.DBEnv()
       
    25         self.dbenvClient = db.DBEnv()
       
    26 
       
    27         # Must use "DB_THREAD" because the Replication Manager will
       
    28         # be executed in other threads but will use the same environment.
       
    29         # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
       
    30         self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN
       
    31                 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
       
    32                 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
       
    33         self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN
       
    34                 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
       
    35                 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
       
    36 
       
    37         self.confirmed_master=self.client_startupdone=False
       
    38         def confirmed_master(a,b,c) :
       
    39             if b==db.DB_EVENT_REP_MASTER :
       
    40                 self.confirmed_master=True
       
    41 
       
    42         def client_startupdone(a,b,c) :
       
    43             if b==db.DB_EVENT_REP_STARTUPDONE :
       
    44                 self.client_startupdone=True
       
    45 
       
    46         self.dbenvMaster.set_event_notify(confirmed_master)
       
    47         self.dbenvClient.set_event_notify(client_startupdone)
       
    48 
       
    49         #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
       
    50         #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
       
    51         #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
       
    52         #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
       
    53 
       
    54         self.dbMaster = self.dbClient = None
       
    55 
       
    56 
       
    57     def tearDown(self):
       
    58         if self.dbClient :
       
    59             self.dbClient.close()
       
    60         if self.dbMaster :
       
    61             self.dbMaster.close()
       
    62         self.dbenvClient.close()
       
    63         self.dbenvMaster.close()
       
    64         test_support.rmtree(self.homeDirClient)
       
    65         test_support.rmtree(self.homeDirMaster)
       
    66 
       
    67     def test01_basic_replication(self) :
       
    68         master_port = test_support.find_unused_port()
       
    69         self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
       
    70         client_port = test_support.find_unused_port()
       
    71         self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port)
       
    72         self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port)
       
    73         self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port)
       
    74         self.dbenvMaster.rep_set_nsites(2)
       
    75         self.dbenvClient.rep_set_nsites(2)
       
    76         self.dbenvMaster.rep_set_priority(10)
       
    77         self.dbenvClient.rep_set_priority(0)
       
    78 
       
    79         self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
       
    80         self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
       
    81         self.assertEquals(self.dbenvMaster.rep_get_timeout(
       
    82             db.DB_REP_CONNECTION_RETRY), 100123)
       
    83         self.assertEquals(self.dbenvClient.rep_get_timeout(
       
    84             db.DB_REP_CONNECTION_RETRY), 100321)
       
    85 
       
    86         self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
       
    87         self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
       
    88         self.assertEquals(self.dbenvMaster.rep_get_timeout(
       
    89             db.DB_REP_ELECTION_TIMEOUT), 100234)
       
    90         self.assertEquals(self.dbenvClient.rep_get_timeout(
       
    91             db.DB_REP_ELECTION_TIMEOUT), 100432)
       
    92 
       
    93         self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
       
    94         self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
       
    95         self.assertEquals(self.dbenvMaster.rep_get_timeout(
       
    96             db.DB_REP_ELECTION_RETRY), 100345)
       
    97         self.assertEquals(self.dbenvClient.rep_get_timeout(
       
    98             db.DB_REP_ELECTION_RETRY), 100543)
       
    99 
       
   100         self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
       
   101         self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
       
   102 
       
   103         self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER);
       
   104         self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT);
       
   105 
       
   106         self.assertEquals(self.dbenvMaster.rep_get_nsites(),2)
       
   107         self.assertEquals(self.dbenvClient.rep_get_nsites(),2)
       
   108         self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
       
   109         self.assertEquals(self.dbenvClient.rep_get_priority(),0)
       
   110         self.assertEquals(self.dbenvMaster.repmgr_get_ack_policy(),
       
   111                 db.DB_REPMGR_ACKS_ALL)
       
   112         self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(),
       
   113                 db.DB_REPMGR_ACKS_ALL)
       
   114 
       
   115         # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
       
   116         # is not generated if the master has no new transactions.
       
   117         # This is solved in BDB 4.6 (#15542).
       
   118         import time
       
   119         timeout = time.time()+10
       
   120         while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
       
   121             time.sleep(0.02)
       
   122         # this fails on Windows as self.client_startupdone never gets set
       
   123         # to True - see bug 3892.  BUT - even though this assertion
       
   124         # fails on Windows the rest of the test passes - so to prove
       
   125         # that we let the rest of the test run.  Sadly we can't
       
   126         # make use of raising TestSkipped() here (unittest still
       
   127         # reports it as an error), so we yell to stderr.
       
   128         import sys
       
   129         if sys.platform=="win32":
       
   130             print >> sys.stderr, \
       
   131                 "XXX - windows bsddb replication fails on windows and is skipped"
       
   132             print >> sys.stderr, "XXX - Please see issue #3892"
       
   133         else:
       
   134             self.assertTrue(time.time()<timeout)
       
   135 
       
   136         d = self.dbenvMaster.repmgr_site_list()
       
   137         self.assertEquals(len(d), 1)
       
   138         self.assertEquals(d[0][0], "127.0.0.1")
       
   139         self.assertEquals(d[0][1], client_port)
       
   140         self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \
       
   141                 (d[0][2]==db.DB_REPMGR_DISCONNECTED))
       
   142 
       
   143         d = self.dbenvClient.repmgr_site_list()
       
   144         self.assertEquals(len(d), 1)
       
   145         self.assertEquals(d[0][0], "127.0.0.1")
       
   146         self.assertEquals(d[0][1], master_port)
       
   147         self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \
       
   148                 (d[0][2]==db.DB_REPMGR_DISCONNECTED))
       
   149 
       
   150         if db.version() >= (4,6) :
       
   151             d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
       
   152             self.assertTrue("msgs_queued" in d)
       
   153 
       
   154         self.dbMaster=db.DB(self.dbenvMaster)
       
   155         txn=self.dbenvMaster.txn_begin()
       
   156         self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
       
   157         txn.commit()
       
   158 
       
   159         import time,os.path
       
   160         timeout=time.time()+10
       
   161         while (time.time()<timeout) and \
       
   162           not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
       
   163             time.sleep(0.01)
       
   164 
       
   165         self.dbClient=db.DB(self.dbenvClient)
       
   166         while True :
       
   167             txn=self.dbenvClient.txn_begin()
       
   168             try :
       
   169                 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
       
   170                         mode=0666, txn=txn)
       
   171             except db.DBRepHandleDeadError :
       
   172                 txn.abort()
       
   173                 self.dbClient.close()
       
   174                 self.dbClient=db.DB(self.dbenvClient)
       
   175                 continue
       
   176 
       
   177             txn.commit()
       
   178             break
       
   179 
       
   180         txn=self.dbenvMaster.txn_begin()
       
   181         self.dbMaster.put("ABC", "123", txn=txn)
       
   182         txn.commit()
       
   183         import time
       
   184         timeout=time.time()+10
       
   185         v=None
       
   186         while (time.time()<timeout) and (v==None) :
       
   187             txn=self.dbenvClient.txn_begin()
       
   188             v=self.dbClient.get("ABC", txn=txn)
       
   189             txn.commit()
       
   190             if v==None :
       
   191                 time.sleep(0.02)
       
   192         self.assertTrue(time.time()<timeout)
       
   193         self.assertEquals("123", v)
       
   194 
       
   195         txn=self.dbenvMaster.txn_begin()
       
   196         self.dbMaster.delete("ABC", txn=txn)
       
   197         txn.commit()
       
   198         timeout=time.time()+10
       
   199         while (time.time()<timeout) and (v!=None) :
       
   200             txn=self.dbenvClient.txn_begin()
       
   201             v=self.dbClient.get("ABC", txn=txn)
       
   202             txn.commit()
       
   203             if v==None :
       
   204                 time.sleep(0.02)
       
   205         self.assertTrue(time.time()<timeout)
       
   206         self.assertEquals(None, v)
       
   207 
       
   208 class DBBaseReplication(DBReplicationManager):
       
   209     def setUp(self) :
       
   210         DBReplicationManager.setUp(self)
       
   211         def confirmed_master(a,b,c) :
       
   212             if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
       
   213                 self.confirmed_master = True
       
   214 
       
   215         def client_startupdone(a,b,c) :
       
   216             if b == db.DB_EVENT_REP_STARTUPDONE :
       
   217                 self.client_startupdone = True
       
   218 
       
   219         self.dbenvMaster.set_event_notify(confirmed_master)
       
   220         self.dbenvClient.set_event_notify(client_startupdone)
       
   221 
       
   222         import Queue
       
   223         self.m2c = Queue.Queue()
       
   224         self.c2m = Queue.Queue()
       
   225 
       
   226         # There are only two nodes, so we don't need to
       
   227         # do any routing decision
       
   228         def m2c(dbenv, control, rec, lsnp, envid, flags) :
       
   229             self.m2c.put((control, rec))
       
   230 
       
   231         def c2m(dbenv, control, rec, lsnp, envid, flags) :
       
   232             self.c2m.put((control, rec))
       
   233 
       
   234         self.dbenvMaster.rep_set_transport(13,m2c)
       
   235         self.dbenvMaster.rep_set_priority(10)
       
   236         self.dbenvClient.rep_set_transport(3,c2m)
       
   237         self.dbenvClient.rep_set_priority(0)
       
   238 
       
   239         self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
       
   240         self.assertEquals(self.dbenvClient.rep_get_priority(),0)
       
   241 
       
   242         #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
       
   243         #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
       
   244         #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
       
   245         #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
       
   246 
       
   247         def thread_master() :
       
   248             return self.thread_do(self.dbenvMaster, self.c2m, 3,
       
   249                     self.master_doing_election, True)
       
   250 
       
   251         def thread_client() :
       
   252             return self.thread_do(self.dbenvClient, self.m2c, 13,
       
   253                     self.client_doing_election, False)
       
   254 
       
   255         from threading import Thread
       
   256         t_m=Thread(target=thread_master)
       
   257         t_c=Thread(target=thread_client)
       
   258         import sys
       
   259         if sys.version_info[0] < 3 :
       
   260             t_m.setDaemon(True)
       
   261             t_c.setDaemon(True)
       
   262         else :
       
   263             t_m.daemon = True
       
   264             t_c.daemon = True
       
   265 
       
   266         self.t_m = t_m
       
   267         self.t_c = t_c
       
   268 
       
   269         self.dbMaster = self.dbClient = None
       
   270 
       
   271         self.master_doing_election=[False]
       
   272         self.client_doing_election=[False]
       
   273 
       
   274 
       
   275     def tearDown(self):
       
   276         if self.dbClient :
       
   277             self.dbClient.close()
       
   278         if self.dbMaster :
       
   279             self.dbMaster.close()
       
   280         self.m2c.put(None)
       
   281         self.c2m.put(None)
       
   282         self.t_m.join()
       
   283         self.t_c.join()
       
   284         self.dbenvClient.close()
       
   285         self.dbenvMaster.close()
       
   286         test_support.rmtree(self.homeDirClient)
       
   287         test_support.rmtree(self.homeDirMaster)
       
   288 
       
   289     def basic_rep_threading(self) :
       
   290         self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
       
   291         self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
       
   292 
       
   293         def thread_do(env, q, envid, election_status, must_be_master) :
       
   294             while True :
       
   295                 v=q.get()
       
   296                 if v == None : return
       
   297                 env.rep_process_message(v[0], v[1], envid)
       
   298 
       
   299         self.thread_do = thread_do
       
   300 
       
   301         self.t_m.start()
       
   302         self.t_c.start()
       
   303 
       
   304     def test01_basic_replication(self) :
       
   305         self.basic_rep_threading()
       
   306 
       
   307         # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
       
   308         # is not generated if the master has no new transactions.
       
   309         # This is solved in BDB 4.6 (#15542).
       
   310         import time
       
   311         timeout = time.time()+10
       
   312         while (time.time()<timeout) and not (self.confirmed_master and
       
   313                 self.client_startupdone) :
       
   314             time.sleep(0.02)
       
   315         self.assertTrue(time.time()<timeout)
       
   316 
       
   317         self.dbMaster=db.DB(self.dbenvMaster)
       
   318         txn=self.dbenvMaster.txn_begin()
       
   319         self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
       
   320         txn.commit()
       
   321 
       
   322         import time,os.path
       
   323         timeout=time.time()+10
       
   324         while (time.time()<timeout) and \
       
   325           not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
       
   326             time.sleep(0.01)
       
   327 
       
   328         self.dbClient=db.DB(self.dbenvClient)
       
   329         while True :
       
   330             txn=self.dbenvClient.txn_begin()
       
   331             try :
       
   332                 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
       
   333                         mode=0666, txn=txn)
       
   334             except db.DBRepHandleDeadError :
       
   335                 txn.abort()
       
   336                 self.dbClient.close()
       
   337                 self.dbClient=db.DB(self.dbenvClient)
       
   338                 continue
       
   339 
       
   340             txn.commit()
       
   341             break
       
   342 
       
   343         txn=self.dbenvMaster.txn_begin()
       
   344         self.dbMaster.put("ABC", "123", txn=txn)
       
   345         txn.commit()
       
   346         import time
       
   347         timeout=time.time()+10
       
   348         v=None
       
   349         while (time.time()<timeout) and (v==None) :
       
   350             txn=self.dbenvClient.txn_begin()
       
   351             v=self.dbClient.get("ABC", txn=txn)
       
   352             txn.commit()
       
   353             if v==None :
       
   354                 time.sleep(0.02)
       
   355         self.assertTrue(time.time()<timeout)
       
   356         self.assertEquals("123", v)
       
   357 
       
   358         txn=self.dbenvMaster.txn_begin()
       
   359         self.dbMaster.delete("ABC", txn=txn)
       
   360         txn.commit()
       
   361         timeout=time.time()+10
       
   362         while (time.time()<timeout) and (v!=None) :
       
   363             txn=self.dbenvClient.txn_begin()
       
   364             v=self.dbClient.get("ABC", txn=txn)
       
   365             txn.commit()
       
   366             if v==None :
       
   367                 time.sleep(0.02)
       
   368         self.assertTrue(time.time()<timeout)
       
   369         self.assertEquals(None, v)
       
   370 
       
   371     if db.version() >= (4,7) :
       
   372         def test02_test_request(self) :
       
   373             self.basic_rep_threading()
       
   374             (minimum, maximum) = self.dbenvClient.rep_get_request()
       
   375             self.dbenvClient.rep_set_request(minimum-1, maximum+1)
       
   376             self.assertEqual(self.dbenvClient.rep_get_request(),
       
   377                     (minimum-1, maximum+1))
       
   378 
       
   379     if db.version() >= (4,6) :
       
   380         def test03_master_election(self) :
       
   381             # Get ready to hold an election
       
   382             #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
       
   383             self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
       
   384             self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
       
   385 
       
   386             def thread_do(env, q, envid, election_status, must_be_master) :
       
   387                 while True :
       
   388                     v=q.get()
       
   389                     if v == None : return
       
   390                     r = env.rep_process_message(v[0],v[1],envid)
       
   391                     if must_be_master and self.confirmed_master :
       
   392                         self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
       
   393                         must_be_master = False
       
   394 
       
   395                     if r[0] == db.DB_REP_HOLDELECTION :
       
   396                         def elect() :
       
   397                             while True :
       
   398                                 try :
       
   399                                     env.rep_elect(2, 1)
       
   400                                     election_status[0] = False
       
   401                                     break
       
   402                                 except db.DBRepUnavailError :
       
   403                                     pass
       
   404                         if not election_status[0] and not self.confirmed_master :
       
   405                             from threading import Thread
       
   406                             election_status[0] = True
       
   407                             t=Thread(target=elect)
       
   408                             import sys
       
   409                             if sys.version_info[0] < 3 :
       
   410                                 t.setDaemon(True)
       
   411                             else :
       
   412                                 t.daemon = True
       
   413                             t.start()
       
   414 
       
   415             self.thread_do = thread_do
       
   416 
       
   417             self.t_m.start()
       
   418             self.t_c.start()
       
   419 
       
   420             self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
       
   421             self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
       
   422             self.client_doing_election[0] = True
       
   423             while True :
       
   424                 try :
       
   425                     self.dbenvClient.rep_elect(2, 1)
       
   426                     self.client_doing_election[0] = False
       
   427                     break
       
   428                 except db.DBRepUnavailError :
       
   429                     pass
       
   430 
       
   431             self.assertTrue(self.confirmed_master)
       
   432 
       
   433 #----------------------------------------------------------------------
       
   434 
       
   435 def test_suite():
       
   436     suite = unittest.TestSuite()
       
   437     if db.version() >= (4, 6) :
       
   438         dbenv = db.DBEnv()
       
   439         try :
       
   440             dbenv.repmgr_get_ack_policy()
       
   441             ReplicationManager_available=True
       
   442         except :
       
   443             ReplicationManager_available=False
       
   444         dbenv.close()
       
   445         del dbenv
       
   446         if ReplicationManager_available :
       
   447             suite.addTest(unittest.makeSuite(DBReplicationManager))
       
   448 
       
   449         if have_threads :
       
   450             suite.addTest(unittest.makeSuite(DBBaseReplication))
       
   451 
       
   452     return suite
       
   453 
       
   454 
       
   455 if __name__ == '__main__':
       
   456     unittest.main(defaultTest='test_suite')