[csw-devel] SF.net SVN: gar:[6635] csw/mgar/pkg/bdb46/trunk

dmichelsen at users.sourceforge.net dmichelsen at users.sourceforge.net
Thu Oct 1 11:52:38 CEST 2009


Revision: 6635
          http://gar.svn.sourceforge.net/gar/?rev=6635&view=rev
Author:   dmichelsen
Date:     2009-10-01 09:52:37 +0000 (Thu, 01 Oct 2009)

Log Message:
-----------
bdb46: Initial commit

Added Paths:
-----------
    csw/mgar/pkg/bdb46/trunk/Makefile
    csw/mgar/pkg/bdb46/trunk/checksums
    csw/mgar/pkg/bdb46/trunk/files/
    csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.1
    csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.2
    csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.3
    csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.4

Property Changed:
----------------
    csw/mgar/pkg/bdb46/trunk/


Property changes on: csw/mgar/pkg/bdb46/trunk
___________________________________________________________________
Modified: svn:externals
   - gar https://gar.svn.sourceforge.net/svnroot/gar/csw/mgar/gar/v1

   + gar https://gar.svn.sourceforge.net/svnroot/gar/csw/mgar/gar/v2


Copied: csw/mgar/pkg/bdb46/trunk/Makefile (from rev 6627, csw/mgar/pkg/bdb47/trunk/Makefile)
===================================================================
--- csw/mgar/pkg/bdb46/trunk/Makefile	                        (rev 0)
+++ csw/mgar/pkg/bdb46/trunk/Makefile	2009-10-01 09:52:37 UTC (rev 6635)
@@ -0,0 +1,88 @@
+GARNAME = db
+GARVERSION = 4.6.21
+CATEGORIES = lib
+
+DESCRIPTION = Berkeley DB 4.6
+define BLURB
+  Berkeley DB (libdb) is a programmatic toolkit that provides embedded database
+  support for both traditional and client/server applications. It includes
+  b+tree, queue, extended linear hashing, fixed, and variable-length record
+  access methods, transactions, locking, logging, shared memory caching and
+  database recovery. DB supports C, C++, Java, and Perl APIs. It is available
+  for a wide variety of UNIX platforms as well as Windows NT and Windows 95
+  (MSVC 4, 5 and 6).
+endef
+
+MASTER_SITES = http://download.oracle.com/berkeley-db/
+DISTFILES = $(GARNAME)-$(GARVERSION).tar.gz 
+
+# We define upstream file regex so we can be notifed of new upstream software release
+UFILES_REGEX = $(GARNAME)-(\d+(?:\.\d+)*).tar.gz
+
+SPKG_SOURCEURL = http://www.oracle.com/technology/software/products/berkeley-db/db/index.html
+
+WORKSRC = $(WORKDIR)/$(GARNAME)-$(GARVERSION)/build_unix
+
+PATCHDIR = $(WORKSRC)/..
+PATCHDIRLEVEL = 0
+PATCHFILES += $(notdir $(wildcard $(FILEDIR)/patch.*))
+
+BUILD64 = 1
+CONFIGURE_SCRIPTS = dist
+
+prefix = $(BUILD_PREFIX)/bdb46
+CONFIGURE_ARGS  = $(DIRPATHS)
+CONFIGURE_ARGS += --enable-compat185
+CONFIGURE_ARGS += --enable-o_direct
+CONFIGURE_ARGS += --enable-rpc
+CONFIGURE_ARGS += --enable-cxx
+CONFIGURE_ARGS += --enable-java
+
+# Exclude TCL support for 64 bit until we have a 64 bit TCL
+CONFIGURE_ARGS-mm-32 = --enable-tcl --with-tcl=$(libpath)
+CONFIGURE_ARGS += $(CONFIGURE_ARGS-mm-$(MEMORYMODEL))
+
+# bdb tests are *very* time consuming
+TEST_SCRIPTS =
+
+NO_ISAEXEC = 1
+
+PACKAGES = CSWbdb46 CSWbdb46devel CSWbdb46doc
+
+CATALOGNAME_CSWbdb46      = berkeleydb46
+CATALOGNAME_CSWbdb46devel = berkeleydb46_devel
+CATALOGNAME_CSWbdb46doc   = berkeleydb46_doc
+
+# Remove the license from share/doc/berkeleydb/license/.*
+# because GAR expects license to be a file instead of a directory
+EXTRA_MERGE_EXCLUDE_FILES = .*/license.*
+
+EXTRA_PAX_ARGS += -s ',^\.$(prefix)/docs,.$(BUILD_PREFIX)/share/doc/$(CATALOGNAME_CSWbdb46),'
+EXTRA_MERGE_EXCLUDE_FILES_isa-sparcv9 = .*/docs.*
+EXTRA_MERGE_EXCLUDE_FILES_isa-amd64 = .*/docs.*
+
+ARCHALL_CSWbdbdoc = 1
+
+SPKG_DESC_CSWbdb46      = BerkeleyDB 4.6 embedded database libraries and utilities
+SPKG_DESC_CSWbdb46devel = BerkeleyDB 4.6 development support
+SPKG_DESC_CSWbdb46doc   = BerkeleyDB 4.6 documentation
+
+REQUIRED_PKGS_CSWbdb46devel = CSWbdb46
+
+LICENSE = LICENSE
+
+PKGFILES_CSWbdb46doc = /opt/csw/share/doc/.*
+
+PKGFILES_CSWbdb46devel = $(PKGFILES_DEVEL)
+
+include gar/category.mk
+
+LIBS += -lnsl
+export LIBS
+
+PATH := /usr/jdk1.6.0_07/bin:$(PATH)
+export PATH
+
+configure-dist:
+	@( cd $(WORKSRC) ; $(CONFIGURE_ENV) ../dist/configure $(CONFIGURE_ARGS) )
+	@$(MAKECOOKIE)

Added: csw/mgar/pkg/bdb46/trunk/checksums
===================================================================
--- csw/mgar/pkg/bdb46/trunk/checksums	                        (rev 0)
+++ csw/mgar/pkg/bdb46/trunk/checksums	2009-10-01 09:52:37 UTC (rev 6635)
@@ -0,0 +1,5 @@
+718082e7e35fc48478a2334b0bc4cd11  download/db-4.6.21.tar.gz
+4878872edfc53c6ecb871b1062a4bdaf  download/patch.4.6.21.1
+55074e53d3acae2dcbeae8322f96e522  download/patch.4.6.21.2
+7a6e11b54712caf752f9c4a52babe60c  download/patch.4.6.21.3
+ae7d3d587355fe85b512ef09b9a77d19  download/patch.4.6.21.4

Added: csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.1
===================================================================
--- csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.1	                        (rev 0)
+++ csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.1	2009-10-01 09:52:37 UTC (rev 6635)
@@ -0,0 +1,90 @@
+*** dbinc/mp.h	2007-09-28 01:28:25.000000000 +1000
+--- dbinc/mp.h	2008-02-14 01:22:09.000000000 +1100
+***************
+*** 639,644 ****
+--- 639,647 ----
+   */
+  #define	MP_TRUNC_RECOVER	0x01
+  
++ /* Private flags to DB_MPOOLFILE->close. */
++ #define	DB_MPOOL_NOLOCK		0x002	/* Already have mpf locked. */
++ 
+  #if defined(__cplusplus)
+  }
+  #endif
+*** mp/mp_fopen.c	2007-05-18 03:18:01.000000000 +1000
+--- mp/mp_fopen.c	2008-02-12 16:09:42.000000000 +1100
+***************
+*** 888,894 ****
+  	 * when we try to flush them.
+  	 */
+  	deleted = 0;
+! 	MUTEX_LOCK(dbenv, mfp->mutex);
+  	if (F_ISSET(dbmfp, MP_MULTIVERSION))
+  		--mfp->multiversion;
+  	if (--mfp->mpf_cnt == 0 || LF_ISSET(DB_MPOOL_DISCARD)) {
+--- 888,895 ----
+  	 * when we try to flush them.
+  	 */
+  	deleted = 0;
+! 	if (!LF_ISSET(DB_MPOOL_NOLOCK))
+! 		MUTEX_LOCK(dbenv, mfp->mutex);
+  	if (F_ISSET(dbmfp, MP_MULTIVERSION))
+  		--mfp->multiversion;
+  	if (--mfp->mpf_cnt == 0 || LF_ISSET(DB_MPOOL_DISCARD)) {
+***************
+*** 909,921 ****
+  			}
+  		}
+  		if (mfp->block_cnt == 0) {
+  			if ((t_ret =
+  			    __memp_mf_discard(dbmp, mfp)) != 0 && ret == 0)
+  				ret = t_ret;
+  			deleted = 1;
+  		}
+  	}
+! 	if (!deleted)
+  		MUTEX_UNLOCK(dbenv, mfp->mutex);
+  
+  done:	/* Discard the DB_MPOOLFILE structure. */
+--- 910,928 ----
+  			}
+  		}
+  		if (mfp->block_cnt == 0) {
++ 			/*
++ 			 * We should never discard this mp file if our caller
++ 			 * is holding the lock on it.  See comment in
++ 			 * __memp_sync_file.
++ 			 */
++ 			DB_ASSERT(dbenv, !LF_ISSET(DB_MPOOL_NOLOCK));
+  			if ((t_ret =
+  			    __memp_mf_discard(dbmp, mfp)) != 0 && ret == 0)
+  				ret = t_ret;
+  			deleted = 1;
+  		}
+  	}
+! 	if (!deleted && !LF_ISSET(DB_MPOOL_NOLOCK))
+  		MUTEX_UNLOCK(dbenv, mfp->mutex);
+  
+  done:	/* Discard the DB_MPOOLFILE structure. */
+*** mp/mp_sync.c	2007-06-02 04:32:44.000000000 +1000
+--- mp/mp_sync.c	2008-02-12 16:09:42.000000000 +1100
+***************
+*** 755,761 ****
+  	 * This is important since we are called with the hash bucket
+  	 * locked.  The mfp will get freed via the cleanup pass.
+  	 */
+! 	if (dbmfp != NULL && (t_ret = __memp_fclose(dbmfp, 0)) != 0 && ret == 0)
+  		ret = t_ret;
+  
+  	--mfp->mpf_cnt;
+--- 755,762 ----
+  	 * This is important since we are called with the hash bucket
+  	 * locked.  The mfp will get freed via the cleanup pass.
+  	 */
+! 	if (dbmfp != NULL &&
+! 	    (t_ret = __memp_fclose(dbmfp, DB_MPOOL_NOLOCK)) != 0 && ret == 0)
+  		ret = t_ret;
+  
+  	--mfp->mpf_cnt;
+

Added: csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.2
===================================================================
--- csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.2	                        (rev 0)
+++ csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.2	2009-10-01 09:52:37 UTC (rev 6635)
@@ -0,0 +1,27 @@
+*** mp/mp_region.c	2007-05-18 03:18:01.000000000 +1000
+--- mp/mp_region.c	2008-06-24 13:15:56.000000000 +1000
+***************
+*** 249,256 ****
+  		mtx_base = htab[0].mtx_hash;
+  	}
+  
+  	if (mtx_base != MUTEX_INVALID)
+! 		mtx_base += reginfo_off * htab_buckets;
+  
+  	/* Allocate hash table space and initialize it. */
+  	if ((ret = __env_alloc(infop,
+--- 249,262 ----
+  		mtx_base = htab[0].mtx_hash;
+  	}
+  
++ 	/*
++ 	 * We preallocated all of the mutexes in a block, so for regions after
++ 	 * the first, we skip mutexes in use in earlier regions.  Each region
++ 	 * has the same number of buckets and there are two mutexes per hash
++ 	 * bucket (the bucket mutex and the I/O mutex).
++ 	 */
+  	if (mtx_base != MUTEX_INVALID)
+! 		mtx_base += reginfo_off * htab_buckets * 2;
+  
+  	/* Allocate hash table space and initialize it. */
+  	if ((ret = __env_alloc(infop,

Added: csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.3
===================================================================
--- csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.3	                        (rev 0)
+++ csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.3	2009-10-01 09:52:37 UTC (rev 6635)
@@ -0,0 +1,74 @@
+*** sequence/sequence.c
+--- sequence/sequence.c
+***************
+*** 196,202 ****
+  	if ((ret = __db_get_flags(dbp, &tflags)) != 0)
+  		goto err;
+  
+! 	if (DB_IS_READONLY(dbp)) {
+  		ret = __db_rdonly(dbp->dbenv, "DB_SEQUENCE->open");
+  		goto err;
+  	}
+--- 196,206 ----
+  	if ((ret = __db_get_flags(dbp, &tflags)) != 0)
+  		goto err;
+  
+! 	/*
+! 	 * We can let replication clients open sequences, but must
+! 	 * check later that they do not update them.
+! 	 */
+! 	if (F_ISSET(dbp, DB_AM_RDONLY)) {
+  		ret = __db_rdonly(dbp->dbenv, "DB_SEQUENCE->open");
+  		goto err;
+  	}
+***************
+*** 252,257 ****
+--- 256,266 ----
+  		if ((ret != DB_NOTFOUND && ret != DB_KEYEMPTY) ||
+  		    !LF_ISSET(DB_CREATE))
+  			goto err;
++ 		if (IS_REP_CLIENT(dbenv) &&
++ 		    !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
++ 			ret = __db_rdonly(dbenv, "DB_SEQUENCE->open");
++ 			goto err;
++ 		}
+  		ret = 0;
+  
+  		rp = &seq->seq_record;
+***************
+*** 304,310 ****
+  	 */
+  	rp = seq->seq_data.data;
+  	if (rp->seq_version == DB_SEQUENCE_OLDVER) {
+! oldver:		rp->seq_version = DB_SEQUENCE_VERSION;
+  		if (__db_isbigendian()) {
+  			if (IS_DB_AUTO_COMMIT(dbp, txn)) {
+  				if ((ret =
+--- 313,324 ----
+  	 */
+  	rp = seq->seq_data.data;
+  	if (rp->seq_version == DB_SEQUENCE_OLDVER) {
+! oldver:		if (IS_REP_CLIENT(dbenv) &&
+! 		    !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
+! 			ret = __db_rdonly(dbenv, "DB_SEQUENCE->open");
+! 			goto err;
+! 		}
+! 		rp->seq_version = DB_SEQUENCE_VERSION;
+  		if (__db_isbigendian()) {
+  			if (IS_DB_AUTO_COMMIT(dbp, txn)) {
+  				if ((ret =
+***************
+*** 713,718 ****
+--- 727,738 ----
+  
+  	MUTEX_LOCK(dbenv, seq->mtx_seq);
+  
++ 	if (handle_check && IS_REP_CLIENT(dbenv) &&
++ 	    !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
++ 		ret = __db_rdonly(dbenv, "DB_SEQUENCE->get");
++ 		goto err;
++ 	}
++ 
+  	if (rp->seq_min + delta > rp->seq_max) {
+  		__db_errx(dbenv, "Sequence overflow");
+  		ret = EINVAL;

Added: csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.4
===================================================================
--- csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.4	                        (rev 0)
+++ csw/mgar/pkg/bdb46/trunk/files/patch.4.6.21.4	2009-10-01 09:52:37 UTC (rev 6635)
@@ -0,0 +1,1414 @@
+*** dbinc/repmgr.h	2007-10-31 10:23:52.000000000 -0700
+--- dbinc/repmgr.h	2007-10-31 10:23:53.000000000 -0700
+***************
+*** 36,41 ****
+--- 36,55 ----
+  #endif
+  
+  /*
++  * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
++  * a queue per connection, waiting for TCP buffer space to become available in
++  * the kernel.  Rather than exceeding this limit, we simply discard additional
++  * messages (since this is always allowed by the replication protocol).
++  *    As a special dispensation, if a message is destined for a specific remote
++  * site (i.e., it's not a broadcast), then we first try blocking the sending
++  * thread, waiting for space to become available (though we only wait a limited
++  * time).  This is so as to be able to handle the immediate flood of (a
++  * potentially large number of) outgoing messages that replication generates, in
++  * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
++  */
++ #define	OUT_QUEUE_LIMIT	10
++ 
++ /*
+   * The system value is available from sysconf(_SC_HOST_NAME_MAX).
+   * Historically, the maximum host name was 256.
+   */
+***************
+*** 47,52 ****
+--- 61,71 ----
+  #define	MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
+  typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
+  
++ /* Default timeout values, in seconds. */
++ #define	DB_REPMGR_DEFAULT_ACK_TIMEOUT		(1 * US_PER_SEC)
++ #define	DB_REPMGR_DEFAULT_CONNECTION_RETRY	(30 * US_PER_SEC)
++ #define	DB_REPMGR_DEFAULT_ELECTION_RETRY	(10 * US_PER_SEC)
++ 
+  struct __repmgr_connection;
+      typedef struct __repmgr_connection REPMGR_CONNECTION;
+  struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
+***************
+*** 171,178 ****
+  #ifdef DB_WIN32
+  	WSAEVENT event_object;
+  #endif
+! #define	CONN_CONNECTING	0x01	/* nonblocking connect in progress */
+! #define	CONN_DEFUNCT	0x02	/* socket close pending */
+  	u_int32_t flags;
+  
+  	/*
+--- 190,198 ----
+  #ifdef DB_WIN32
+  	WSAEVENT event_object;
+  #endif
+! #define	CONN_CONGESTED	0x01	/* msg thread wait has exceeded timeout */
+! #define	CONN_CONNECTING	0x02	/* nonblocking connect in progress */
+! #define	CONN_DEFUNCT	0x04	/* socket close pending */
+  	u_int32_t flags;
+  
+  	/*
+***************
+*** 180,189 ****
+  	 * send() function's thread.  But if TCP doesn't have enough network
+  	 * buffer space for us when we first try it, we instead allocate some
+  	 * memory, and copy the message, and then send it as space becomes
+! 	 * available in our main select() thread.
+  	 */
+  	OUT_Q_HEADER outbound_queue;
+  	int out_queue_length;
+  
+  	/*
+  	 * Input: while we're reading a message, we keep track of what phase
+--- 200,215 ----
+  	 * send() function's thread.  But if TCP doesn't have enough network
+  	 * buffer space for us when we first try it, we instead allocate some
+  	 * memory, and copy the message, and then send it as space becomes
+! 	 * available in our main select() thread.  In some cases, if the queue
+! 	 * gets too long we wait until it's drained, and then append to it.
+! 	 * This condition variable's associated mutex is the normal per-repmgr
+! 	 * db_rep->mutex, because that mutex is always held anyway whenever the
+! 	 * output queue is consulted.
+  	 */
+  	OUT_Q_HEADER outbound_queue;
+  	int out_queue_length;
++ 	cond_var_t drained;
++ 	int blockers;		/* ref count of msg threads waiting on us */
+  
+  	/*
+  	 * Input: while we're reading a message, we keep track of what phase
+*** dbinc_auto/int_def.in	2007-10-31 10:23:52.000000000 -0700
+--- dbinc_auto/int_def.in	2007-10-31 10:23:52.000000000 -0700
+***************
+*** 1420,1425 ****
+--- 1420,1428 ----
+  #define	__repmgr_wake_waiting_senders __repmgr_wake_waiting_senders at DB_VERSION_UNIQUE_NAME@
+  #define	__repmgr_await_ack __repmgr_await_ack at DB_VERSION_UNIQUE_NAME@
+  #define	__repmgr_compute_wait_deadline __repmgr_compute_wait_deadline at DB_VERSION_UNIQUE_NAME@
++ #define	__repmgr_await_drain __repmgr_await_drain at DB_VERSION_UNIQUE_NAME@
++ #define	__repmgr_alloc_cond __repmgr_alloc_cond at DB_VERSION_UNIQUE_NAME@
++ #define	__repmgr_free_cond __repmgr_free_cond at DB_VERSION_UNIQUE_NAME@
+  #define	__repmgr_init_sync __repmgr_init_sync at DB_VERSION_UNIQUE_NAME@
+  #define	__repmgr_close_sync __repmgr_close_sync at DB_VERSION_UNIQUE_NAME@
+  #define	__repmgr_net_init __repmgr_net_init at DB_VERSION_UNIQUE_NAME@
+*** dbinc_auto/repmgr_ext.h	2007-10-31 10:23:52.000000000 -0700
+--- dbinc_auto/repmgr_ext.h	2007-10-31 10:23:52.000000000 -0700
+***************
+*** 21,30 ****
+  int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
+  void __repmgr_stash_generation __P((DB_ENV *));
+  int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
+! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *));
+  int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
+! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int));
+! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
+  int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
+  int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
+  int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
+--- 21,30 ----
+  int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
+  void __repmgr_stash_generation __P((DB_ENV *));
+  int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
+! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int));
+  int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
+! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *));
+! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
+  int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
+  int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
+  int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
+***************
+*** 39,44 ****
+--- 39,47 ----
+  int __repmgr_wake_waiting_senders __P((DB_ENV *));
+  int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
+  void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t));
++ int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t));
++ int __repmgr_alloc_cond __P((cond_var_t *));
++ int __repmgr_free_cond __P((cond_var_t *));
+  int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
+  int __repmgr_close_sync __P((DB_ENV *));
+  int __repmgr_net_init __P((DB_ENV *, DB_REP *));
+*** repmgr/repmgr_method.c	2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_method.c	2007-10-31 10:23:53.000000000 -0700
+***************
+*** 196,204 ****
+  	int ret;
+  
+  	/* Set some default values. */
+! 	db_rep->ack_timeout = 1 * US_PER_SEC;			/*  1 second */
+! 	db_rep->connection_retry_wait = 30 * US_PER_SEC;	/* 30 seconds */
+! 	db_rep->election_retry_wait = 10 * US_PER_SEC;		/* 10 seconds */
+  	db_rep->config_nsites = 0;
+  	db_rep->peer = DB_EID_INVALID;
+  	db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
+--- 196,204 ----
+  	int ret;
+  
+  	/* Set some default values. */
+! 	db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
+! 	db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
+! 	db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
+  	db_rep->config_nsites = 0;
+  	db_rep->peer = DB_EID_INVALID;
+  	db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
+***************
+*** 238,243 ****
+--- 238,244 ----
+  	DB_ENV *dbenv;
+  {
+  	DB_REP *db_rep;
++ 	REPMGR_CONNECTION *conn;
+  	int ret;
+  
+  	db_rep = dbenv->rep_handle;
+***************
+*** 254,259 ****
+--- 255,266 ----
+  
+  	if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
+  		goto unlock;
++ 
++ 	TAILQ_FOREACH(conn, &db_rep->connections, entries) {
++ 		if (conn->blockers > 0 &&
++ 		    ((ret = __repmgr_signal(&conn->drained)) != 0))
++ 			goto unlock;
++ 	}
+  	UNLOCK_MUTEX(db_rep->mutex);
+  
+  	return (__repmgr_wake_main_thread(dbenv));
+*** repmgr/repmgr_msg.c	2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_msg.c	2007-10-31 10:23:53.000000000 -0700
+***************
+*** 183,192 ****
+  
+  /*
+   * Acknowledges a message.
+-  *
+-  * !!!
+-  * Note that this cannot be called from the select() thread, in case we call
+-  * __repmgr_bust_connection(..., FALSE).
+   */
+  static int
+  ack_message(dbenv, generation, lsn)
+--- 183,188 ----
+***************
+*** 227,235 ****
+  		rec2.size = 0;
+  
+  		conn = site->ref.conn;
+  		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
+! 		    &control2, &rec2)) == DB_REP_UNAVAIL)
+! 			ret = __repmgr_bust_connection(dbenv, conn, FALSE);
+  	}
+  
+  	UNLOCK_MUTEX(db_rep->mutex);
+--- 223,236 ----
+  		rec2.size = 0;
+  
+  		conn = site->ref.conn;
++ 		/*
++ 		 * It's hard to imagine anyone would care about a lost ack if
++ 		 * the path to the master is so congested as to need blocking;
++ 		 * so pass "blockable" argument as FALSE.
++ 		 */
+  		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
+! 		    &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
+! 			ret = __repmgr_bust_connection(dbenv, conn);
+  	}
+  
+  	UNLOCK_MUTEX(db_rep->mutex);
+*** repmgr/repmgr_net.c	2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_net.c	2007-10-31 10:23:53.000000000 -0700
+***************
+*** 63,69 ****
+  static void setup_sending_msg
+      __P((struct sending_msg *, u_int, const DBT *, const DBT *));
+  static int __repmgr_send_internal
+!     __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
+  static int enqueue_msg
+      __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
+  static int flatten __P((DB_ENV *, struct sending_msg *));
+--- 63,69 ----
+  static void setup_sending_msg
+      __P((struct sending_msg *, u_int, const DBT *, const DBT *));
+  static int __repmgr_send_internal
+!     __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
+  static int enqueue_msg
+      __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
+  static int flatten __P((DB_ENV *, struct sending_msg *));
+***************
+*** 73,85 ****
+   * __repmgr_send --
+   *	The send function for DB_ENV->rep_set_transport.
+   *
+-  * !!!
+-  * This is only ever called as the replication transport call-back, which means
+-  * it's either on one of our message processing threads or an application
+-  * thread.  It mustn't be called from the select() thread, because we might call
+-  * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
+-  * select() thread.
+-  *
+   * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
+   * PUBLIC:     const DB_LSN *, int, u_int32_t));
+   */
+--- 73,78 ----
+***************
+*** 126,134 ****
+  		}
+  
+  		conn = site->ref.conn;
+  		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
+! 		    control, rec)) == DB_REP_UNAVAIL &&
+! 		    (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
+  			ret = t_ret;
+  		if (ret != 0)
+  			goto out;
+--- 119,128 ----
+  		}
+  
+  		conn = site->ref.conn;
++ 		/* Pass the "blockable" argument as TRUE. */
+  		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
+! 		    control, rec, TRUE)) == DB_REP_UNAVAIL &&
+! 		    (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0)
+  			ret = t_ret;
+  		if (ret != 0)
+  			goto out;
+***************
+*** 222,228 ****
+  	if (site->state != SITE_CONNECTED)
+  		return (NULL);
+  
+! 	if (F_ISSET(site->ref.conn, CONN_CONNECTING))
+  		return (NULL);
+  	return (site);
+  }
+--- 216,222 ----
+  	if (site->state != SITE_CONNECTED)
+  		return (NULL);
+  
+! 	if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT))
+  		return (NULL);
+  	return (site);
+  }
+***************
+*** 235,244 ****
+   *
+   * !!!
+   * Caller must hold dbenv->mutex.
+-  *
+-  * !!!
+-  * Note that this cannot be called from the select() thread, in case we call
+-  * __repmgr_bust_connection(..., FALSE).
+   */
+  static int
+  __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
+--- 229,234 ----
+***************
+*** 268,281 ****
+  		    !IS_VALID_EID(conn->eid))
+  			continue;
+  
+! 		if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
+  			site = SITE_FROM_EID(conn->eid);
+  			nsites++;
+  			if (site->priority > 0)
+  				npeers++;
+  		} else if (ret == DB_REP_UNAVAIL) {
+! 			if ((ret = __repmgr_bust_connection(
+! 			     dbenv, conn, FALSE)) != 0)
+  				return (ret);
+  		} else
+  			return (ret);
+--- 258,277 ----
+  		    !IS_VALID_EID(conn->eid))
+  			continue;
+  
+! 		/*
+! 		 * Broadcast messages are either application threads committing
+! 		 * transactions, or replication status message that we can
+! 		 * afford to lose.  So don't allow blocking for them (pass
+! 		 * "blockable" argument as FALSE).
+! 		 */
+! 		if ((ret = __repmgr_send_internal(dbenv,
+! 		    conn, &msg, FALSE)) == 0) {
+  			site = SITE_FROM_EID(conn->eid);
+  			nsites++;
+  			if (site->priority > 0)
+  				npeers++;
+  		} else if (ret == DB_REP_UNAVAIL) {
+! 			if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
+  				return (ret);
+  		} else
+  			return (ret);
+***************
+*** 301,339 ****
+   * intersperse writes that are part of two single messages.
+   *
+   * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
+!  * PUBLIC:    u_int, const DBT *, const DBT *));
+   */
+  int
+! __repmgr_send_one(dbenv, conn, msg_type, control, rec)
+  	DB_ENV *dbenv;
+  	REPMGR_CONNECTION *conn;
+  	u_int msg_type;
+  	const DBT *control, *rec;
+  {
+  	struct sending_msg msg;
+  
+  	setup_sending_msg(&msg, msg_type, control, rec);
+! 	return (__repmgr_send_internal(dbenv, conn, &msg));
+  }
+  
+  /*
+   * Attempts a "best effort" to send a message on the given site.  If there is an
+!  * excessive backlog of message already queued on the connection, we simply drop
+!  * this message, and still return 0 even in this case.
+   */
+  static int
+! __repmgr_send_internal(dbenv, conn, msg)
+  	DB_ENV *dbenv;
+  	REPMGR_CONNECTION *conn;
+  	struct sending_msg *msg;
+  {
+! #define	OUT_QUEUE_LIMIT 10	/* arbitrary, for now */
+  	REPMGR_IOVECS iovecs;
+  	SITE_STRING_BUFFER buffer;
+  	int ret;
+  	size_t nw;
+  	size_t total_written;
+  
+  	DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
+  	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
+  		/*
+--- 297,355 ----
+   * intersperse writes that are part of two single messages.
+   *
+   * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
+!  * PUBLIC:    u_int, const DBT *, const DBT *, int));
+   */
+  int
+! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable)
+  	DB_ENV *dbenv;
+  	REPMGR_CONNECTION *conn;
+  	u_int msg_type;
+  	const DBT *control, *rec;
++ 	int blockable;
+  {
+  	struct sending_msg msg;
+  
+  	setup_sending_msg(&msg, msg_type, control, rec);
+! 	return (__repmgr_send_internal(dbenv, conn, &msg, blockable));
+  }
+  
+  /*
+   * Attempts a "best effort" to send a message on the given site.  If there is an
+!  * excessive backlog of message already queued on the connection, what shall we
+!  * do?  If the caller doesn't mind blocking, we'll wait (a limited amount of
+!  * time) for the queue to drain.  Otherwise we'll simply drop the message.  This
+!  * is always allowed by the replication protocol.  But in the case of a
+!  * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
+!  * almost always get a flood of messages that instantly fills our queue, so
+!  * blocking improves performance (by avoiding the need for the client to
+!  * re-request).
+!  *
+!  * How long shall we wait?  We could of course create a new timeout
+!  * configuration type, so that the application could set it directly.  But that
+!  * would start to overwhelm the user with too many choices to think about.  We
+!  * already have an ACK timeout, which is the user's estimate of how long it
+!  * should take to send a message to the client, have it be processed, and return
+!  * a message back to us.  We multiply that by the queue size, because that's how
+!  * many messages have to be swallowed up by the client before we're able to
+!  * start sending again (at least to a rough approximation).
+   */
+  static int
+! __repmgr_send_internal(dbenv, conn, msg, blockable)
+  	DB_ENV *dbenv;
+  	REPMGR_CONNECTION *conn;
+  	struct sending_msg *msg;
++ 	int blockable;
+  {
+! 	DB_REP *db_rep;
+  	REPMGR_IOVECS iovecs;
+  	SITE_STRING_BUFFER buffer;
++ 	db_timeout_t drain_to;
+  	int ret;
+  	size_t nw;
+  	size_t total_written;
+  
++ 	db_rep = dbenv->rep_handle;
++ 
+  	DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
+  	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
+  		/*
+***************
+*** 344,358 ****
+  		RPRINT(dbenv, (dbenv, "msg to %s to be queued",
+  		    __repmgr_format_eid_loc(dbenv->rep_handle,
+  		    conn->eid, buffer)));
+  		if (conn->out_queue_length < OUT_QUEUE_LIMIT)
+  			return (enqueue_msg(dbenv, conn, msg, 0));
+  		else {
+  			RPRINT(dbenv, (dbenv, "queue limit exceeded"));
+  			STAT(dbenv->rep_handle->
+  			    region->mstat.st_msgs_dropped++);
+! 			return (0);
+  		}
+  	}
+  
+  	/*
+  	 * Send as much data to the site as we can, without blocking.  Keep
+--- 360,393 ----
+  		RPRINT(dbenv, (dbenv, "msg to %s to be queued",
+  		    __repmgr_format_eid_loc(dbenv->rep_handle,
+  		    conn->eid, buffer)));
++ 		if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
++ 		    blockable && !F_ISSET(conn, CONN_CONGESTED)) {
++ 			RPRINT(dbenv, (dbenv,
++ 			    "block msg thread, await queue space"));
++ 
++ 			if ((drain_to = db_rep->ack_timeout) == 0)
++ 				drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
++ 			conn->blockers++;
++ 			ret = __repmgr_await_drain(dbenv,
++ 			    conn, drain_to * OUT_QUEUE_LIMIT);
++ 			conn->blockers--;
++ 			if (db_rep->finished)
++ 				return (DB_TIMEOUT);
++ 			if (ret != 0)
++ 				return (ret);
++ 			if (STAILQ_EMPTY(&conn->outbound_queue))
++ 				goto empty;
++ 		}
+  		if (conn->out_queue_length < OUT_QUEUE_LIMIT)
+  			return (enqueue_msg(dbenv, conn, msg, 0));
+  		else {
+  			RPRINT(dbenv, (dbenv, "queue limit exceeded"));
+  			STAT(dbenv->rep_handle->
+  			    region->mstat.st_msgs_dropped++);
+! 			return (blockable ? DB_TIMEOUT : 0);
+  		}
+  	}
++ empty:
+  
+  	/*
+  	 * Send as much data to the site as we can, without blocking.  Keep
+***************
+*** 498,521 ****
+  
+  /*
+   * Abandons a connection, to recover from an error.  Upon entry the conn struct
+!  * must be on the connections list.
+!  *
+!  * If the 'do_close' flag is true, we do the whole job; the clean-up includes
+!  * removing the struct from the list and freeing all its memory, so upon return
+!  * the caller must not refer to it any further.  Otherwise, we merely mark the
+!  * connection for clean-up later by the main thread.
+   *
+   * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
+!  * PUBLIC:     REPMGR_CONNECTION *, int));
+   *
+   * !!!
+   * Caller holds mutex.
+   */
+  int
+! __repmgr_bust_connection(dbenv, conn, do_close)
+  	DB_ENV *dbenv;
+  	REPMGR_CONNECTION *conn;
+- 	int do_close;
+  {
+  	DB_REP *db_rep;
+  	int connecting, ret, eid;
+--- 533,553 ----
+  
+  /*
+   * Abandons a connection, to recover from an error.  Upon entry the conn struct
+!  * must be on the connections list.  For now, just mark it as unusable; it will
+!  * be fully cleaned up in the top-level select thread, as soon as possible.
+   *
+   * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
+!  * PUBLIC:     REPMGR_CONNECTION *));
+   *
+   * !!!
+   * Caller holds mutex.
++  *
++  * Must be idempotent
+   */
+  int
+! __repmgr_bust_connection(dbenv, conn)
+  	DB_ENV *dbenv;
+  	REPMGR_CONNECTION *conn;
+  {
+  	DB_REP *db_rep;
+  	int connecting, ret, eid;
+***************
+*** 526,537 ****
+  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+  	eid = conn->eid;
+  	connecting = F_ISSET(conn, CONN_CONNECTING);
+! 	if (do_close)
+! 		__repmgr_cleanup_connection(dbenv, conn);
+! 	else {
+! 		F_SET(conn, CONN_DEFUNCT);
+! 		conn->eid = -1;
+! 	}
+  
+  	/*
+  	 * When we first accepted the incoming connection, we set conn->eid to
+--- 558,566 ----
+  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+  	eid = conn->eid;
+  	connecting = F_ISSET(conn, CONN_CONNECTING);
+! 
+! 	F_SET(conn, CONN_DEFUNCT);
+! 	conn->eid = -1;
+  
+  	/*
+  	 * When we first accepted the incoming connection, we set conn->eid to
+***************
+*** 557,563 ****
+  			    dbenv, ELECT_FAILURE_ELECTION)) != 0)
+  				return (ret);
+  		}
+! 	} else if (!do_close) {
+  		/*
+  		 * One way or another, make sure the main thread is poked, so
+  		 * that we do the deferred clean-up.
+--- 586,592 ----
+  			    dbenv, ELECT_FAILURE_ELECTION)) != 0)
+  				return (ret);
+  		}
+! 	} else {
+  		/*
+  		 * One way or another, make sure the main thread is poked, so
+  		 * that we do the deferred clean-up.
+***************
+*** 568,577 ****
+  }
+  
+  /*
+!  * PUBLIC: void __repmgr_cleanup_connection
+   * PUBLIC:    __P((DB_ENV *, REPMGR_CONNECTION *));
+   */
+! void
+  __repmgr_cleanup_connection(dbenv, conn)
+  	DB_ENV *dbenv;
+  	REPMGR_CONNECTION *conn;
+--- 597,610 ----
+  }
+  
+  /*
+!  * PUBLIC: int __repmgr_cleanup_connection
+   * PUBLIC:    __P((DB_ENV *, REPMGR_CONNECTION *));
++  *
++  * !!!
++  * Idempotent.  This can be called repeatedly as blocking message threads (of
++  * which there could be multiples) wake up in case of error on the connection.
+   */
+! int
+  __repmgr_cleanup_connection(dbenv, conn)
+  	DB_ENV *dbenv;
+  	REPMGR_CONNECTION *conn;
+***************
+*** 580,596 ****
+  	QUEUED_OUTPUT *out;
+  	REPMGR_FLAT *msg;
+  	DBT *dbt;
+  
+  	db_rep = dbenv->rep_handle;
+  
+! 	TAILQ_REMOVE(&db_rep->connections, conn, entries);
+  	if (conn->fd != INVALID_SOCKET) {
+! 		(void)closesocket(conn->fd);
+  #ifdef DB_WIN32
+! 		(void)WSACloseEvent(conn->event_object);
+  #endif
+  	}
+  
+  	/*
+  	 * Deallocate any input and output buffers we may have.
+  	 */
+--- 613,643 ----
+  	QUEUED_OUTPUT *out;
+  	REPMGR_FLAT *msg;
+  	DBT *dbt;
++ 	int ret;
+  
+  	db_rep = dbenv->rep_handle;
+  
+! 	DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished);
+! 	
+  	if (conn->fd != INVALID_SOCKET) {
+! 		ret = closesocket(conn->fd);
+! 		conn->fd = INVALID_SOCKET;
+! 		if (ret == SOCKET_ERROR) {
+! 			ret = net_errno;
+! 			__db_err(dbenv, ret, "closing socket");
+! 		}
+  #ifdef DB_WIN32
+! 		if (!WSACloseEvent(conn->event_object) && ret != 0)
+! 			ret = net_errno;
+  #endif
++ 		if (ret != 0)
++ 			return (ret);
+  	}
+  
++ 	if (conn->blockers > 0)
++ 		return (__repmgr_signal(&conn->drained));
++ 
++ 	TAILQ_REMOVE(&db_rep->connections, conn, entries);
+  	/*
+  	 * Deallocate any input and output buffers we may have.
+  	 */
+***************
+*** 614,620 ****
+--- 661,669 ----
+  		__os_free(dbenv, out);
+  	}
+  
++ 	ret = __repmgr_free_cond(&conn->drained);
+  	__os_free(dbenv, conn);
++ 	return (ret);
+  }
+  
+  static int
+***************
+*** 1063,1069 ****
+  
+  	while (!TAILQ_EMPTY(&db_rep->connections)) {
+  		conn = TAILQ_FIRST(&db_rep->connections);
+! 		__repmgr_cleanup_connection(dbenv, conn);
+  	}
+  
+  	for (i = 0; i < db_rep->site_cnt; i++) {
+--- 1112,1118 ----
+  
+  	while (!TAILQ_EMPTY(&db_rep->connections)) {
+  		conn = TAILQ_FIRST(&db_rep->connections);
+! 		(void)__repmgr_cleanup_connection(dbenv, conn);
+  	}
+  
+  	for (i = 0; i < db_rep->site_cnt; i++) {
+*** repmgr/repmgr_posix.c	2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_posix.c	2007-10-31 10:23:53.000000000 -0700
+***************
+*** 21,26 ****
+--- 21,28 ----
+  size_t __repmgr_guesstimated_max = (128 * 1024);
+  #endif
+  
++ static int __repmgr_conn_work __P((DB_ENV *,
++     REPMGR_CONNECTION *, fd_set *, fd_set *, int));
+  static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
+  
+  /*
+***************
+*** 189,194 ****
+--- 191,284 ----
+  }
+  
+  /*
++  * PUBLIC: int __repmgr_await_drain __P((DB_ENV *,
++  * PUBLIC:    REPMGR_CONNECTION *, db_timeout_t));
++  *
++  * Waits for space to become available on the connection's output queue.
++  * Various ways we can exit:
++  *
++  * 1. queue becomes non-full
++  * 2. exceed time limit
++  * 3. connection becomes defunct (due to error in another thread)
++  * 4. repmgr is shutting down
++  * 5. any unexpected system resource failure
++  *
++  * In cases #3 and #5 we return an error code.  Caller is responsible for
++  * distinguishing the remaining cases if desired.
++  * 
++  * !!!
++  * Caller must hold repmgr->mutex.
++  */
++ int
++ __repmgr_await_drain(dbenv, conn, timeout)
++ 	DB_ENV *dbenv;
++ 	REPMGR_CONNECTION *conn;
++ 	db_timeout_t timeout;
++ {
++ 	DB_REP *db_rep;
++ 	struct timespec deadline;
++ 	int ret;
++ 
++ 	db_rep = dbenv->rep_handle;
++ 
++ 	__repmgr_compute_wait_deadline(dbenv, &deadline, timeout);
++ 
++ 	ret = 0;
++ 	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
++ 		ret = pthread_cond_timedwait(&conn->drained,
++ 		    &db_rep->mutex, &deadline);
++ 		switch (ret) {
++ 		case 0:
++ 			if (db_rep->finished)
++ 				goto out; /* #4. */
++ 			/*
++ 			 * Another thread could have stumbled into an error on
++ 			 * the socket while we were waiting.
++ 			 */
++ 			if (F_ISSET(conn, CONN_DEFUNCT)) {
++ 				ret = DB_REP_UNAVAIL; /* #3. */
++ 				goto out;
++ 			}
++ 			break;
++ 		case ETIMEDOUT:
++ 			F_SET(conn, CONN_CONGESTED);
++ 			ret = 0;
++ 			goto out; /* #2. */
++ 		default:
++ 			goto out; /* #5. */
++ 		}
++ 	}
++ 	/* #1. */
++ 
++ out:
++ 	return (ret);
++ }
++ 
++ /*
++  * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
++  *
++  * Initialize a condition variable (in allocated space).
++  */
++ int
++ __repmgr_alloc_cond(c)
++ 	cond_var_t *c;
++ {
++ 	return (pthread_cond_init(c, NULL));
++ }
++ 
++ /*
++  * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
++  *
++  * Clean up a previously initialized condition variable.
++  */
++ int
++ __repmgr_free_cond(c)
++ 	cond_var_t *c;
++ {
++ 	return (pthread_cond_destroy(c));
++ }
++ 
++ /*
+   * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
+   *
+   * Allocate/initialize all data necessary for thread synchronization.  This
+***************
+*** 443,449 ****
+  	REPMGR_RETRY *retry;
+  	db_timespec timeout;
+  	fd_set reads, writes;
+! 	int ret, flow_control, maxfd, nready;
+  	u_int8_t buf[10];	/* arbitrary size */
+  
+  	flow_control = FALSE;
+--- 533,539 ----
+  	REPMGR_RETRY *retry;
+  	db_timespec timeout;
+  	fd_set reads, writes;
+! 	int ret, flow_control, maxfd;
+  	u_int8_t buf[10];	/* arbitrary size */
+  
+  	flow_control = FALSE;
+***************
+*** 477,482 ****
+--- 567,575 ----
+  		 * each one.
+  		 */
+  		TAILQ_FOREACH(conn, &db_rep->connections, entries) {
++ 			if (F_ISSET(conn, CONN_DEFUNCT))
++ 				continue;
++ 			
+  			if (F_ISSET(conn, CONN_CONNECTING)) {
+  				FD_SET((u_int)conn->fd, &reads);
+  				FD_SET((u_int)conn->fd, &writes);
+***************
+*** 533,616 ****
+  				return (ret);
+  			}
+  		}
+- 		nready = ret;
+- 
+  		LOCK_MUTEX(db_rep->mutex);
+  
+- 		/*
+- 		 * The first priority thing we must do is to clean up any
+- 		 * pending defunct connections.  Otherwise, if they have any
+- 		 * lingering pending input, we get very confused if we try to
+- 		 * process it.
+- 		 *
+- 		 * The TAILQ_FOREACH macro would be suitable here, except that
+- 		 * it doesn't allow unlinking the current element, which is
+- 		 * needed for cleanup_connection.
+- 		 */
+- 		for (conn = TAILQ_FIRST(&db_rep->connections);
+- 		     conn != NULL;
+- 		     conn = next) {
+- 			next = TAILQ_NEXT(conn, entries);
+- 			if (F_ISSET(conn, CONN_DEFUNCT))
+- 				__repmgr_cleanup_connection(dbenv, conn);
+- 		}
+- 
+  		if ((ret = __repmgr_retry_connections(dbenv)) != 0)
+  			goto out;
+- 		if (nready == 0)
+- 			continue;
+  
+  		/*
+! 		 * Traverse the linked list.  (Again, like TAILQ_FOREACH, except
+! 		 * that we need the ability to unlink an element along the way.)
+  		 */
+  		for (conn = TAILQ_FIRST(&db_rep->connections);
+  		     conn != NULL;
+  		     conn = next) {
+  			next = TAILQ_NEXT(conn, entries);
+! 			if (F_ISSET(conn, CONN_CONNECTING)) {
+! 				if (FD_ISSET((u_int)conn->fd, &reads) ||
+! 				    FD_ISSET((u_int)conn->fd, &writes)) {
+! 					if ((ret = finish_connecting(dbenv,
+! 					    conn)) == DB_REP_UNAVAIL) {
+! 						if ((ret =
+! 						    __repmgr_bust_connection(
+! 						    dbenv, conn, TRUE)) != 0)
+! 							goto out;
+! 					} else if (ret != 0)
+! 						goto out;
+! 				}
+! 				continue;
+! 			}
+! 
+! 			/*
+! 			 * Here, the site is connected, and the FD_SET's are
+! 			 * valid.
+! 			 */
+! 			if (FD_ISSET((u_int)conn->fd, &writes)) {
+! 				if ((ret = __repmgr_write_some(
+! 				    dbenv, conn)) == DB_REP_UNAVAIL) {
+! 					if ((ret =
+! 					    __repmgr_bust_connection(dbenv,
+! 					    conn, TRUE)) != 0)
+! 						goto out;
+! 					continue;
+! 				} else if (ret != 0)
+! 					goto out;
+! 			}
+! 
+! 			if (!flow_control &&
+! 			    FD_ISSET((u_int)conn->fd, &reads)) {
+! 				if ((ret = __repmgr_read_from_site(dbenv, conn))
+! 				    == DB_REP_UNAVAIL) {
+! 					if ((ret =
+! 					    __repmgr_bust_connection(dbenv,
+! 					    conn, TRUE)) != 0)
+! 						goto out;
+! 					continue;
+! 				} else if (ret != 0)
+! 					goto out;
+! 			}
+  		}
+  
+  		/*
+--- 626,650 ----
+  				return (ret);
+  			}
+  		}
+  		LOCK_MUTEX(db_rep->mutex);
+  
+  		if ((ret = __repmgr_retry_connections(dbenv)) != 0)
+  			goto out;
+  
+  		/*
+! 		 * Examine each connection, to see what work needs to be done.
+! 		 * 
+! 		 * The TAILQ_FOREACH macro would be suitable here, except that
+! 		 * it doesn't allow unlinking the current element, which is
+! 		 * needed for cleanup_connection.
+  		 */
+  		for (conn = TAILQ_FIRST(&db_rep->connections);
+  		     conn != NULL;
+  		     conn = next) {
+  			next = TAILQ_NEXT(conn, entries);
+! 			if ((ret = __repmgr_conn_work(dbenv,
+! 			    conn, &reads, &writes, flow_control)) != 0)
+! 				goto out;
+  		}
+  
+  		/*
+***************
+*** 637,642 ****
+--- 671,719 ----
+  }
+  
+  static int
++ __repmgr_conn_work(dbenv, conn, reads, writes, flow_control)
++ 	DB_ENV *dbenv;
++ 	REPMGR_CONNECTION *conn;
++ 	fd_set *reads, *writes;
++ 	int flow_control;
++ {
++ 	int ret;
++ 	u_int fd;
++ 
++ 	if (F_ISSET(conn, CONN_DEFUNCT)) {
++ 		/*
++ 		 * Deferred clean-up, from an error that happened in another
++ 		 * thread, while we were sleeping in select().
++ 		*/
++ 		return (__repmgr_cleanup_connection(dbenv, conn));
++ 	}
++ 
++ 	ret = 0;
++ 	fd = (u_int)conn->fd;
++ 	
++ 	if (F_ISSET(conn, CONN_CONNECTING)) {
++ 		if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
++ 			ret = finish_connecting(dbenv, conn);
++ 	} else {
++ 		/*
++ 		 * Here, the site is connected, and the FD_SET's are valid.
++ 		 */
++ 		if (FD_ISSET(fd, writes))
++ 			ret = __repmgr_write_some(dbenv, conn);
++ 				
++ 		if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
++ 			ret = __repmgr_read_from_site(dbenv, conn);
++ 	}
++ 
++ 	if (ret == DB_REP_UNAVAIL) {
++ 		if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
++ 			return (ret);
++ 		ret = __repmgr_cleanup_connection(dbenv, conn);
++ 	}
++ 	return (ret);
++ }
++ 
++ static int
+  finish_connecting(dbenv, conn)
+  	DB_ENV *dbenv;
+  	REPMGR_CONNECTION *conn;
+***************
+*** 657,662 ****
+--- 734,740 ----
+  		goto err_rpt;
+  	}
+  
++ 	DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING));
+  	F_CLR(conn, CONN_CONNECTING);
+  	return (__repmgr_send_handshake(dbenv, conn));
+  
+***************
+*** 671,690 ****
+  	    "connecting to %s", __repmgr_format_site_loc(site, buffer));
+  
+  	/* If we've exhausted the list of possible addresses, give up. */
+! 	if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
+  		return (DB_REP_UNAVAIL);
+  
+  	/*
+  	 * This is just like a little mini-"bust_connection", except that we
+  	 * don't reschedule for later, 'cuz we're just about to try again right
+! 	 * now.
+  	 *
+  	 * !!!
+  	 * Which means this must only be called on the select() thread, since
+  	 * only there are we allowed to actually close a connection.
+  	 */
+  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+! 	__repmgr_cleanup_connection(dbenv, conn);
+  	ret = __repmgr_connect_site(dbenv, eid);
+  	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+  	return (ret);
+--- 749,773 ----
+  	    "connecting to %s", __repmgr_format_site_loc(site, buffer));
+  
+  	/* If we've exhausted the list of possible addresses, give up. */
+! 	if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
+! 		STAT(db_rep->region->mstat.st_connect_fail++);
+  		return (DB_REP_UNAVAIL);
++ 	}
+  
+  	/*
+  	 * This is just like a little mini-"bust_connection", except that we
+  	 * don't reschedule for later, 'cuz we're just about to try again right
+! 	 * now.  (Note that we don't have to worry about message threads
+! 	 * blocking on a full output queue: that can't happen when we're only
+! 	 * just connecting.)
+  	 *
+  	 * !!!
+  	 * Which means this must only be called on the select() thread, since
+  	 * only there are we allowed to actually close a connection.
+  	 */
+  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+! 	if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
+! 		return (ret);
+  	ret = __repmgr_connect_site(dbenv, eid);
+  	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+  	return (ret);
+*** repmgr/repmgr_sel.c	2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_sel.c	2007-10-31 10:23:53.000000000 -0700
+***************
+*** 36,45 ****
+  
+  /*
+   * PUBLIC: int __repmgr_accept __P((DB_ENV *));
+-  *
+-  * !!!
+-  * Only ever called in the select() thread, since we may call
+-  * __repmgr_bust_connection(..., TRUE).
+   */
+  int
+  __repmgr_accept(dbenv)
+--- 36,41 ----
+***************
+*** 133,139 ****
+  	case 0:
+  		return (0);
+  	case DB_REP_UNAVAIL:
+! 		return (__repmgr_bust_connection(dbenv, conn, TRUE));
+  	default:
+  		return (ret);
+  	}
+--- 129,135 ----
+  	case 0:
+  		return (0);
+  	case DB_REP_UNAVAIL:
+! 		return (__repmgr_bust_connection(dbenv, conn));
+  	default:
+  		return (ret);
+  	}
+***************
+*** 254,263 ****
+   * starting with the "current" element of its address list and trying as many
+   * addresses as necessary until the list is exhausted.
+   *
+-  * !!!
+-  * Only ever called in the select() thread, since we may call
+-  * __repmgr_bust_connection(..., TRUE).
+-  *
+   * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
+   */
+  int
+--- 250,255 ----
+***************
+*** 332,338 ****
+  		case 0:
+  			break;
+  		case DB_REP_UNAVAIL:
+! 			return (__repmgr_bust_connection(dbenv, con, TRUE));
+  		default:
+  			return (ret);
+  		}
+--- 324,330 ----
+  		case 0:
+  			break;
+  		case DB_REP_UNAVAIL:
+! 			return (__repmgr_bust_connection(dbenv, con));
+  		default:
+  			return (ret);
+  		}
+***************
+*** 437,443 ****
+  
+  	DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
+  
+! 	return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
+  }
+  
+  /*
+--- 429,443 ----
+  
+  	DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
+  
+! 	/*
+! 	 * It would of course be disastrous to block the select() thread, so
+! 	 * pass the "blockable" argument as FALSE.  Fortunately blocking should
+! 	 * never be necessary here, because the hand-shake is always the first
+! 	 * thing we send.  Which is a good thing, because it would be almost as
+! 	 * disastrous if we allowed ourselves to drop a handshake.
+! 	 */
+! 	return (__repmgr_send_one(dbenv,
+! 	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
+  }
+  
+  /*
+***************
+*** 854,859 ****
+--- 854,872 ----
+  			conn->out_queue_length--;
+  			if (--msg->ref_count <= 0)
+  				__os_free(dbenv, msg);
++ 
++ 			/*
++ 			 * We've achieved enough movement to free up at least
++ 			 * one space in the outgoing queue.  Wake any message
++ 			 * threads that may be waiting for space.  Clear the
++ 			 * CONGESTED status so that when the queue reaches the
++ 			 * high-water mark again, the filling thread will be
++ 			 * allowed to try waiting again.
++ 			 */
++ 			F_CLR(conn, CONN_CONGESTED);
++ 			if (conn->blockers > 0 &&
++ 			    (ret = __repmgr_signal(&conn->drained)) != 0)
++ 				return (ret);
+  		}
+  	}
+  
+*** repmgr/repmgr_util.c	2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_util.c	2007-10-31 10:23:53.000000000 -0700
+***************
+*** 103,108 ****
+--- 103,113 ----
+  	db_rep = dbenv->rep_handle;
+  	if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
+  		return (ret);
++ 	if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
++ 		__os_free(dbenv, c);
++ 		return (ret);
++ 	}
++ 	c->blockers = 0;
+  
+  	c->fd = s;
+  	c->flags = flags;
+*** repmgr/repmgr_windows.c	2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_windows.c	2007-10-31 10:23:53.000000000 -0700
+***************
+*** 11,16 ****
+--- 11,19 ----
+  #define	__INCLUDE_NETWORKING	1
+  #include "db_int.h"
+  
++ /* Convert time-out from microseconds to milliseconds, rounding up. */
++ #define	DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
++ 
+  typedef struct __ack_waiter {
+  	HANDLE event;
+  	const DB_LSN *lsnp;
+***************
+*** 120,136 ****
+  {
+  	DB_REP *db_rep;
+  	ACK_WAITER *me;
+! 	DWORD ret;
+! 	DWORD timeout;
+  
+  	db_rep = dbenv->rep_handle;
+  
+  	if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
+  		goto err;
+  
+- 	/* convert time-out from microseconds to milliseconds, rounding up */
+  	timeout = db_rep->ack_timeout > 0 ?
+! 	    ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE;
+  	me->lsnp = lsnp;
+  	if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
+  	    FALSE)) == WAIT_FAILED) {
+--- 123,137 ----
+  {
+  	DB_REP *db_rep;
+  	ACK_WAITER *me;
+! 	DWORD ret, timeout;
+  
+  	db_rep = dbenv->rep_handle;
+  
+  	if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
+  		goto err;
+  
+  	timeout = db_rep->ack_timeout > 0 ?
+! 	    DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
+  	me->lsnp = lsnp;
+  	if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
+  	    FALSE)) == WAIT_FAILED) {
+***************
+*** 211,216 ****
+--- 212,296 ----
+  	db_rep->waiters->first_free = slot;
+  }
+  
++ /* (See requirements described in repmgr_posix.c.) */
++ int
++ __repmgr_await_drain(dbenv, conn, timeout)
++ 	DB_ENV *dbenv;
++ 	REPMGR_CONNECTION *conn;
++ 	db_timeout_t timeout;
++ {
++ 	DB_REP *db_rep;
++ 	db_timespec deadline, delta, now;
++ 	db_timeout_t t;
++ 	DWORD duration, ret;
++ 	int round_up;
++ 	
++ 	db_rep = dbenv->rep_handle;
++ 	
++ 	__os_gettime(dbenv, &deadline);
++ 	DB_TIMEOUT_TO_TIMESPEC(timeout, &delta);
++ 	timespecadd(&deadline, &delta);
++ 
++ 	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
++ 		if (!ResetEvent(conn->drained))
++ 			return (GetLastError());
++ 
++ 		/* How long until the deadline? */
++ 		__os_gettime(dbenv, &now);
++ 		if (timespeccmp(&now, &deadline, >=)) {
++ 			F_SET(conn, CONN_CONGESTED);
++ 			return (0);
++ 		}
++ 		delta = deadline;
++ 		timespecsub(&delta, &now);
++ 		round_up = TRUE;
++ 		DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
++ 		duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
++ 
++ 		ret = SignalObjectAndWait(db_rep->mutex,
++ 		    conn->drained, duration, FALSE);
++ 		LOCK_MUTEX(db_rep->mutex);
++ 		if (ret == WAIT_FAILED)
++ 			return (GetLastError());
++ 		else if (ret == WAIT_TIMEOUT) {
++ 			F_SET(conn, CONN_CONGESTED);
++ 			return (0);
++ 		} else
++ 			DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
++ 		
++ 		if (db_rep->finished)
++ 			return (0);
++ 		if (F_ISSET(conn, CONN_DEFUNCT))
++ 			return (DB_REP_UNAVAIL);
++ 	}
++ 	return (0);
++ }
++ 
++ /*
++  * Creates a manual reset event, which is usually our best choice when we may
++  * have multiple threads waiting on a single event.
++  */
++ int
++ __repmgr_alloc_cond(c)
++ 	cond_var_t *c;
++ {
++ 	HANDLE event;
++ 
++ 	if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
++ 		return (GetLastError());
++ 	*c = event;
++ 	return (0);
++ }
++ 
++ int
++ __repmgr_free_cond(c)
++ 	cond_var_t *c;
++ {
++ 	if (CloseHandle(*c))
++ 		return (0);
++ 	return (GetLastError());
++ }
++ 
+  /*
+   * Make resource allocation an all-or-nothing affair, outside of this and the
+   * close_sync function.  db_rep->waiters should be non-NULL iff all of these
+***************
+*** 488,493 ****
+--- 568,576 ----
+  		 * don't hurt anything flow-control-wise.
+  		 */
+  		TAILQ_FOREACH(conn, &db_rep->connections, entries) {
++ 			if (F_ISSET(conn, CONN_DEFUNCT))
++ 				continue;
++ 
+  			if (F_ISSET(conn, CONN_CONNECTING) ||
+  			    !STAILQ_EMPTY(&conn->outbound_queue) ||
+  			    (!flow_control || !IS_VALID_EID(conn->eid))) {
+***************
+*** 534,541 ****
+  		     conn != NULL;
+  		     conn = next) {
+  			next = TAILQ_NEXT(conn, entries);
+! 			if (F_ISSET(conn, CONN_DEFUNCT))
+! 				__repmgr_cleanup_connection(dbenv, conn);
+  		}
+  
+  		/*
+--- 617,626 ----
+  		     conn != NULL;
+  		     conn = next) {
+  			next = TAILQ_NEXT(conn, entries);
+! 			if (F_ISSET(conn, CONN_DEFUNCT) &&
+! 			    (ret = __repmgr_cleanup_connection(dbenv,
+! 			    conn)) != 0)
+! 				goto unlock;
+  		}
+  
+  		/*
+***************
+*** 587,597 ****
+  	return (ret);
+  }
+  
+- /*
+-  * !!!
+-  * Only ever called on the select() thread, since we may call
+-  * __repmgr_bust_connection(..., TRUE).
+-  */
+  static int
+  handle_completion(dbenv, conn)
+  	DB_ENV *dbenv;
+--- 672,677 ----
+***************
+*** 651,660 ****
+  		}
+  	}
+  
+! 	return (0);
+! 
+! err:	if (ret == DB_REP_UNAVAIL)
+! 		return (__repmgr_bust_connection(dbenv, conn, TRUE));
+  	return (ret);
+  }
+  
+--- 731,742 ----
+  		}
+  	}
+  
+! err:
+! 	if (ret == DB_REP_UNAVAIL) {
+! 		if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
+! 			return (ret);
+! 		ret = __repmgr_cleanup_connection(dbenv, conn);
+! 	}
+  	return (ret);
+  }
+  
+***************
+*** 708,714 ****
+  	}
+  
+  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+! 	__repmgr_cleanup_connection(dbenv, conn);
+  	ret = __repmgr_connect_site(dbenv, eid);
+  	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+  	return (ret);
+--- 790,797 ----
+  	}
+  
+  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+! 	if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
+! 		return (ret);
+  	ret = __repmgr_connect_site(dbenv, eid);
+  	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+  	return (ret);


This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.



More information about the devel mailing list