diff -cr cvs/src/backend/access/transam/slru.c cvs.build/src/backend/access/transam/slru.c
*** cvs/src/backend/access/transam/slru.c	2009-05-10 19:49:47.000000000 +0200
--- cvs.build/src/backend/access/transam/slru.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 59,83 ****
  #include "miscadmin.h"
  
  
- /*
-  * Define segment size.  A page is the same BLCKSZ as is used everywhere
-  * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
-  * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
-  * or 64K transactions for SUBTRANS.
-  *
-  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
-  * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
-  * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
-  * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
-  * take no explicit notice of that fact in this module, except when comparing
-  * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
-  *
-  * Note: this file currently assumes that segment file names will be four
-  * hex digits.	This sets a lower bound on the segment size (64K transactions
-  * for 32-bit TransactionIds).
-  */
- #define SLRU_PAGES_PER_SEGMENT	32
- 
  #define SlruFileName(ctl, path, seg) \
  	snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
  
--- 59,64 ----
diff -cr cvs/src/backend/access/transam/xact.c cvs.build/src/backend/access/transam/xact.c
*** cvs/src/backend/access/transam/xact.c	2009-09-06 08:58:59.000000000 +0200
--- cvs.build/src/backend/access/transam/xact.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 1604,1612 ****
  	/* close large objects before lower-level cleanup */
  	AtEOXact_LargeObject(true);
  
- 	/* NOTIFY commit must come before lower-level cleanup */
- 	AtCommit_Notify();
- 
  	/* Prevent cancel/die interrupt while cleaning up */
  	HOLD_INTERRUPTS();
  
--- 1604,1609 ----
***************
*** 1690,1695 ****
--- 1687,1693 ----
  	/* Check we've released all catcache entries */
  	AtEOXact_CatCache(true);
  
+ 	AtCommit_Notify();
  	AtEOXact_GUC(true, 1);
  	AtEOXact_SPI(true);
  	AtEOXact_on_commit_actions(true);
diff -cr cvs/src/backend/catalog/Makefile cvs.build/src/backend/catalog/Makefile
*** cvs/src/backend/catalog/Makefile	2009-10-31 14:47:46.000000000 +0100
--- cvs.build/src/backend/catalog/Makefile	2009-11-11 01:11:08.000000000 +0100
***************
*** 30,36 ****
  	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \
! 	pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \
  	pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
  	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
  	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
--- 30,36 ----
  	pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
  	pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
  	pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \
! 	pg_rewrite.h pg_trigger.h pg_description.h pg_cast.h \
  	pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
  	pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
  	pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
diff -cr cvs/src/backend/commands/async.c cvs.build/src/backend/commands/async.c
*** cvs/src/backend/commands/async.c	2009-09-06 08:59:06.000000000 +0200
--- cvs.build/src/backend/commands/async.c	2009-11-11 20:32:20.000000000 +0100
***************
*** 14,44 ****
  
  /*-------------------------------------------------------------------------
   * New Async Notification Model:
!  * 1. Multiple backends on same machine.  Multiple backends listening on
!  *	  one relation.  (Note: "listening on a relation" is not really the
!  *	  right way to think about it, since the notify names need not have
!  *	  anything to do with the names of relations actually in the database.
!  *	  But this terminology is all over the code and docs, and I don't feel
!  *	  like trying to replace it.)
!  *
!  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
!  *	  ie, each relname/listenerPID pair.  The "notification" field of the
!  *	  tuple is zero when no NOTIFY is pending for that listener, or the PID
!  *	  of the originating backend when a cross-backend NOTIFY is pending.
!  *	  (We skip writing to pg_listener when doing a self-NOTIFY, so the
!  *	  notification field should never be equal to the listenerPID field.)
!  *
!  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
!  *	  relname to a list of outstanding NOTIFY requests.  Actual processing
!  *	  happens if and only if we reach transaction commit.  At that time (in
!  *	  routine AtCommit_Notify) we scan pg_listener for matching relnames.
!  *	  If the listenerPID in a matching tuple is ours, we just send a notify
!  *	  message to our own front end.  If it is not ours, and "notification"
!  *	  is not already nonzero, we set notification to our own PID and send a
!  *	  PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by
!  *	  listenerPID).
!  *	  BTW: if the signal operation fails, we presume that the listener backend
!  *	  crashed without removing this tuple, and remove the tuple for it.
   *
   * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
   *	  can call inbound-notify processing immediately if this backend is idle
--- 14,60 ----
  
  /*-------------------------------------------------------------------------
   * New Async Notification Model:
!  *
!  * 1. Multiple backends on same machine. Multiple backends listening on
!  *	  several channels. (This was previously called a "relation" even though it
!  *	  is just an identifier and has nothing to do with a database relation.)
!  *
!  * 2. There is one central queue in the form of Slru backed file based storage
!  *    (directory pg_notify/), with several pages mapped in to shared memory.
!  *
!  *    There is no central storage of which backend listens on which channel,
!  *    every backend has its own list.
!  *
!  *    Every backend that is listening on at least one channel registers by
!  *    entering its Pid into the array of all backends. It then scans all
!  *    incoming notifications and compares the notified channels with its list.
!  *
!  *    In case there is a match it delivers the corresponding notification to
!  *    its frontend.
!  *
!  * 3. The NOTIFY statement (routine Async_Notify) registers the notification
!  *    in a list which will not be processed until at transaction end. Every
!  *    notification can additionally send a "payload" which is an extra text
!  *    parameter to convey arbitrary information to the recipient.
!  *
!  *    Duplicate notifications from the same transaction are sent out as one
!  *    notification only. This is done to save work when for example a trigger
!  *    on a 2 million row table fires a notification for each row that has been
!  *    changed. If the applications needs to receive any notification that has
!  *    been sent, it can easily add some unique string into the extra payload
!  *    parameter.
!  *
!  *    Once the transaction commits, AtCommit_Notify performs the required
!  *    changes regarding listeners (Listen/Unlisten) and then adds the pending
!  *    notifications to the beginning of the queue. The head pointer of the
!  *    queue always points to the next free position and a position is just a
!  *    page number and the offset in that page.
!  *
!  *    After adding the notifications and adjusting the head pointer, the list
!  *    of listening backends is scanned and we send a PROCSIG_NOTIFY_INTERRUPT
!  *    to every backend that has set its Pid (We don't know which backend is
!  *    listening on which channel so we need to send a signal to every listening
!  *    backend).
   *
   * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
   *	  can call inbound-notify processing immediately if this backend is idle
***************
*** 46,93 ****
   *	  block).  Otherwise the handler may only set a flag, which will cause the
   *	  processing to occur just before we next go idle.
   *
!  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
!  *	  matching our own listenerPID and having nonzero notification fields.
!  *	  For each such tuple, we send a message to our frontend and clear the
!  *	  notification field.  BTW: this routine has to start/commit its own
!  *	  transaction, since by assumption it is only called from outside any
!  *	  transaction.
!  *
!  * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
!  * of pending actions.	If we reach transaction commit, the changes are
!  * applied to pg_listener just before executing any pending NOTIFYs.  This
!  * method is necessary because to avoid race conditions, we must hold lock
!  * on pg_listener from when we insert a new listener tuple until we commit.
!  * To do that and not create undue hazard of deadlock, we don't want to
!  * touch pg_listener until we are otherwise done with the transaction;
!  * in particular it'd be uncool to still be taking user-commanded locks
!  * while holding the pg_listener lock.
!  *
!  * Although we grab ExclusiveLock on pg_listener for any operation,
!  * the lock is never held very long, so it shouldn't cause too much of
!  * a performance problem.  (Previously we used AccessExclusiveLock, but
!  * there's no real reason to forbid concurrent reads.)
   *
!  * An application that listens on the same relname it notifies will get
   * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
   * by comparing be_pid in the NOTIFY message to the application's own backend's
!  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
   * frontend during startup.)  The above design guarantees that notifies from
!  * other backends will never be missed by ignoring self-notifies.  Note,
!  * however, that we do *not* guarantee that a separate frontend message will
!  * be sent for every outside NOTIFY.  Since there is only room for one
!  * originating PID in pg_listener, outside notifies occurring at about the
!  * same time may be collapsed into a single message bearing the PID of the
!  * first outside backend to perform the NOTIFY.
   *-------------------------------------------------------------------------
   */
  
  #include "postgres.h"
  
  #include <unistd.h>
  #include <signal.h>
  
  #include "access/heapam.h"
  #include "access/twophase_rmgr.h"
  #include "access/xact.h"
  #include "catalog/pg_listener.h"
--- 62,103 ----
   *	  block).  Otherwise the handler may only set a flag, which will cause the
   *	  processing to occur just before we next go idle.
   *
!  * 5. Inbound-notify processing consists of reading all of the notifications
!  *	  that have arrived since scanning last time. We read every notification
!  *	  until we reach the head pointer's position. Then we check if we were the
!  *	  laziest backend: if our pointer is set to the same position as the global
!  *	  tail pointer is set, then we set it further to the second-laziest
!  *	  backend (This we identify by inspecting the positions of all other
!  *	  backends' pointers). Whenever we move the tail pointer we also truncate
!  *	  now unused pages (i.e. delete files in pg_notify/ that are no longer
!  *	  used).
   *
!  * An application that listens on the same channel it notifies will get
   * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
   * by comparing be_pid in the NOTIFY message to the application's own backend's
!  * Pid.  (As of FE/BE protocol 2.0, the backend's Pid is provided to the
   * frontend during startup.)  The above design guarantees that notifies from
!  * other backends will never be missed by ignoring self-notifies.
   *-------------------------------------------------------------------------
   */
  
+ /* XXX 
+  *
+  * TODO:
+  *  - does a variable length for payload make sense ??
+  *  - tests with multiple dbs
+  *  - guc parameter max_notifies_per_txn ??
+  *  - adapt comments
+  */
+ 
  #include "postgres.h"
  
  #include <unistd.h>
  #include <signal.h>
  
  #include "access/heapam.h"
+ #include "access/slru.h"
+ #include "access/transam.h"
  #include "access/twophase_rmgr.h"
  #include "access/xact.h"
  #include "catalog/pg_listener.h"
***************
*** 108,115 ****
  
  /*
   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
!  * all actions requested in the current transaction.  As explained above,
!  * we don't actually modify pg_listener until we reach transaction commit.
   *
   * The list is kept in CurTransactionContext.  In subtransactions, each
   * subtransaction has its own list in its own CurTransactionContext, but
--- 118,125 ----
  
  /*
   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
!  * all actions requested in the current transaction. As explained above,
!  * we don't actually send notifications until we reach transaction commit.
   *
   * The list is kept in CurTransactionContext.  In subtransactions, each
   * subtransaction has its own list in its own CurTransactionContext, but
***************
*** 134,140 ****
  static List *upperPendingActions = NIL; /* list of upper-xact lists */
  
  /*
!  * State for outbound notifies consists of a list of all relnames NOTIFYed
   * in the current transaction.	We do not actually perform a NOTIFY until
   * and unless the transaction commits.	pendingNotifies is NIL if no
   * NOTIFYs have been done in the current transaction.
--- 144,150 ----
  static List *upperPendingActions = NIL; /* list of upper-xact lists */
  
  /*
!  * State for outbound notifies consists of a list of all channels NOTIFYed
   * in the current transaction.	We do not actually perform a NOTIFY until
   * and unless the transaction commits.	pendingNotifies is NIL if no
   * NOTIFYs have been done in the current transaction.
***************
*** 149,158 ****
   * condition name, it will get a self-notify at commit.  This is a bit odd
   * but is consistent with our historical behavior.
   */
! static List *pendingNotifies = NIL;		/* list of C strings */
  
  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
  
  /*
   * State for inbound notifies consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
--- 159,274 ----
   * condition name, it will get a self-notify at commit.  This is a bit odd
   * but is consistent with our historical behavior.
   */
! 
! typedef struct Notification
! {
! 	char   *channel;
! 	char   *payload;
! 	/* we only need one of both, depending on whether we send a notification or
! 	 * receive one. */
! 	union {
! 		int32 dstPid;
! 		int32 srcPid;
! 	};
! } Notification;
! 
! typedef struct AsyncQueueEntry
! {
! 	Oid		dboid;
! 	int32	srcPid;
! 	char	channel[NAMEDATALEN];
! 	char	payload[NOTIFY_PAYLOAD_MAX_LENGTH];
! } AsyncQueueEntry;
! 
! #define	INVALID_PID (-1)
! #define QUEUE_POS_PAGE(x) ((x).page)
! #define QUEUE_POS_OFFSET(x) ((x).offset)
! #define QUEUE_POS_EQUAL(x,y) \
! 	 ((x).page == (y).page ? (y).offset == (x).offset : false)
! #define SET_QUEUE_POS(x,y,z) \
! 	do { \
! 		(x).page = (y); \
! 		(x).offset = (z); \
! 	} while (0);
! /* does page x precede page y with z = HEAD ? */
! #define QUEUE_POS_MIN(x,y,z) \
! 		AsyncPagePrecedesLogically((x).page, (y).page, (z).page) ? (x) : \
! 			 AsyncPagePrecedesLogically((y).page, (x).page, (z).page) ? (y) : \
! 				 (x).offset < (y).offset ? (x) : \
! 				 	(y)
! #define QUEUE_BACKEND_POS(i) asyncQueueControl->backend[(i)].pos
! #define QUEUE_BACKEND_PID(i) asyncQueueControl->backend[(i)].pid
! #define QUEUE_HEAD asyncQueueControl->head
! #define QUEUE_TAIL asyncQueueControl->tail
! 
! typedef struct QueuePosition
! {
! 	int			page;
! 	int			offset;
! } QueuePosition;
! 
! typedef struct QueueBackendStatus
! {
! 	int32			pid;
! 	QueuePosition	pos;
! } QueueBackendStatus;
! 
! /*
!  * The AsyncQueueControl structure is protected by the AsyncQueueLock.
!  *
!  * In SHARED mode, backends will only inspect their own entries as well as
!  * head and tail pointers. Consequently we can allow a backend to update its
!  * own record while holding only a shared lock (since no other backend will
!  * inspect it).
!  *
!  * In EXCLUSIVE mode, backends can inspect the entries of other backends and
!  * also change head and tail pointers.
!  *
!  * In order to avoid deadlocks, whenever we need both locks, we always first
!  * get AsyncQueueLock and then AsyncCtlLock.
!  */
! typedef struct AsyncQueueControl
! {
! 	QueuePosition		head;		/* head points to the next free location */
! 	QueuePosition 		tail;		/* the global tail is equivalent to the
! 									   tail of the "slowest" backend */
! 	TimestampTz			lastQueueFullWarn;	/* when the queue is full we only
! 											   want to log that once in a
! 											   while */
! 	QueueBackendStatus	backend[1];	/* actually this one has as many entries as
! 									 * connections are allowed (MaxBackends) */
! 	/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
! } AsyncQueueControl;
! 
! static AsyncQueueControl   *asyncQueueControl;
! static SlruCtlData			AsyncCtlData;
! 
! #define AsyncCtl					(&AsyncCtlData)
! #define NUM_ASYNC_BUFFER_SLOTS		4
! #define QUEUE_PAGESIZE				BLCKSZ
! #define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
! 
! /*
!  * slru.c currently assumes that all filenames are four characters of hex
!  * digits. That means that we can use segments 0000 through FFFF.
!  * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
!  * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0xFFFF.
!  *
!  * It's of course easy to enhance slru.c but those pages give us so much
!  * space already that it doesn't seem worth the trouble...
!  *
!  * It's a legal test case to define QUEUE_MAX_PAGE to a very small multiply of
!  * SLRU_PAGES_PER_SEGMENT to test queue full behaviour.
!  */
! #define QUEUE_MAX_PAGE			(SLRU_PAGES_PER_SEGMENT * 0xFFFF)
! 
! 
! static List *pendingNotifies = NIL;		/* list of Notifications */
  
  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
  
+ static List *listenChannels = NIL;	/* list of channels we are listening to */
+ 
  /*
   * State for inbound notifies consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
***************
*** 174,207 ****
  
  static void queue_listen(ListenActionKind action, const char *condname);
  static void Async_UnlistenOnExit(int code, Datum arg);
! static void Exec_Listen(Relation lRel, const char *relname);
! static void Exec_Unlisten(Relation lRel, const char *relname);
! static void Exec_UnlistenAll(Relation lRel);
! static void Send_Notify(Relation lRel);
  static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
! static bool AsyncExistsPendingNotify(const char *relname);
  static void ClearPendingActionsAndNotifies(void);
  
  
  /*
   * Async_Notify
   *
   *		This is executed by the SQL notify command.
   *
!  *		Adds the relation to the list of pending notifies.
   *		Actual notification happens during transaction commit.
   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   */
  void
! Async_Notify(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Notify(%s)", relname);
  
  	/* no point in making duplicate entries in the list ... */
! 	if (!AsyncExistsPendingNotify(relname))
  	{
  		/*
  		 * The name list needs to live until end of transaction, so store it
  		 * in the transaction context.
--- 290,431 ----
  
  static void queue_listen(ListenActionKind action, const char *condname);
  static void Async_UnlistenOnExit(int code, Datum arg);
! static bool IsListeningOn(const char *channel);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
! static void Exec_Listen(const char *channel);
! static void Exec_Unlisten(const char *channel);
! static void Exec_UnlistenAll(void);
! static void SignalBackends(void);
! static void Send_Notify(void);
! static bool AsyncPagePrecedesPhysically(int p, int q);
! static bool AsyncPagePrecedesLogically(int p, int q, int head);
! static bool asyncQueueAdvance(QueuePosition *position);
! static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
! static List *asyncQueueAddEntries(List *notifications);
! static List *asyncQueueGetEntriesByPage(QueuePosition *current,
! 										QueuePosition stop);
! static void asyncQueueAdvanceTail();
  static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(const char *channel,
! 							 const char *payload,
! 							 int32 dstPid);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
  static void ClearPendingActionsAndNotifies(void);
  
+ static void asyncQueueAdvanceTail(void);
+ 
+ 
+ /*
+  * We will work on the page range of 0..(SLRU_PAGES_PER_SEGMENT * 0xFFFF).
+  * AsyncPagePrecedesPhysically just checks numerically without any magic if
+  * one page precedes another one.
+  *
+  * On the other hand, when AsyncPagePrecedesLogically does that check, it
+  * takes the current head page number into account. Now if we have wrapped
+  * around, it can happen that p precedes q, even though p > q (if the head page
+  * is in between the two).
+  */ 
+ static bool
+ AsyncPagePrecedesPhysically(int p, int q)
+ {
+ 	return p < q;
+ }
+ 
+ static bool
+ AsyncPagePrecedesLogically(int p, int q, int head)
+ {
+ 	if (p <= head && q <= head)
+ 		return p < q;
+ 	if (p > head && q > head)
+ 		return p < q;
+ 	if (p <= head)
+ 	{
+ 		Assert(q > head);
+ 		/* q is older */
+ 		return false;
+ 	}
+ 	else
+ 	{
+ 		Assert(p > head && q <= head);
+ 		/* p is older */
+ 		return true;
+ 	}
+ }
+ 
+ void
+ AsyncShmemInit(void)
+ {
+ 	bool	found;
+ 	int		slotno;
+ 	Size	size;
+ 
+ 	/*
+ 	 * Remember that sizeof(AsyncQueueControl) already contains one member of
+ 	 * QueueBackendStatus, so we only need to add the status space requirement
+ 	 * for MaxBackends-1 backends.
+ 	 */
+ 	size = mul_size(MaxBackends-1, sizeof(QueueBackendStatus));
+ 	size = add_size(size, sizeof(AsyncQueueControl));
+ 
+ 	asyncQueueControl = (AsyncQueueControl *)
+ 		ShmemInitStruct("Async Queue Control", size, &found);
+ 
+ 	if (!asyncQueueControl)
+ 		elog(ERROR, "out of memory");
+ 
+ 	if (!found)
+ 	{
+ 		int		i;
+ 		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
+ 		SET_QUEUE_POS(QUEUE_TAIL, QUEUE_MAX_PAGE, 0);
+ 		for (i = 0; i < MaxBackends; i++)
+ 		{
+ 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ 			QUEUE_BACKEND_PID(i) = INVALID_PID;
+ 		}
+ 	}
+ 
+ 	AsyncCtl->PagePrecedes = AsyncPagePrecedesPhysically;
+ 	SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFER_SLOTS, 0,
+ 				  AsyncCtlLock, "pg_notify");
+ 	AsyncCtl->do_fsync = false;
+ 	asyncQueueControl->lastQueueFullWarn = GetCurrentTimestamp();
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ 	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
+ 	slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+ 	SimpleLruWritePage(AsyncCtl, slotno, NULL);
+ 	LWLockRelease(AsyncCtlLock);
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	SlruScanDirectory(AsyncCtl, QUEUE_MAX_PAGE, true);
+ }
+ 
  
  /*
   * Async_Notify
   *
   *		This is executed by the SQL notify command.
   *
!  *		Adds the channel to the list of pending notifies.
   *		Actual notification happens during transaction commit.
   *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   */
  void
! Async_Notify(const char *channel, const char *payload)
  {
+ 
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Notify(%s)", channel);
! 
! 	/*
! 	 * XXX - do we now need a guc parameter max_notifies_per_txn?
! 	 */ 
  
  	/* no point in making duplicate entries in the list ... */
! 	if (!AsyncExistsPendingNotify(channel, payload))
  	{
+ 		Notification *n;
  		/*
  		 * The name list needs to live until end of transaction, so store it
  		 * in the transaction context.
***************
*** 210,221 ****
  
  		oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
  		/*
! 		 * Ordering of the list isn't important.  We choose to put new entries
! 		 * on the front, as this might make duplicate-elimination a tad faster
! 		 * when the same condition is signaled many times in a row.
  		 */
! 		pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
  
  		MemoryContextSwitchTo(oldcontext);
  	}
--- 434,452 ----
  
  		oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
+ 		n = (Notification *) palloc(sizeof(Notification));
+ 		n->channel = pstrdup(channel);
+ 		if (payload)
+ 			n->payload = pstrdup(payload);
+ 		else
+ 			n->payload = "";
+ 		n->dstPid = INVALID_PID;
+ 
  		/*
! 		 * We want to preserve the order so we need to append every
! 		 * notification. See comments at AsyncExistsPendingNotify().
  		 */
! 		pendingNotifies = lappend(pendingNotifies, n);
  
  		MemoryContextSwitchTo(oldcontext);
  	}
***************
*** 259,270 ****
   *		This is executed by the SQL listen command.
   */
  void
! Async_Listen(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
  
! 	queue_listen(LISTEN_LISTEN, relname);
  }
  
  /*
--- 490,501 ----
   *		This is executed by the SQL listen command.
   */
  void
! Async_Listen(const char *channel)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
  
! 	queue_listen(LISTEN_LISTEN, channel);
  }
  
  /*
***************
*** 273,288 ****
   *		This is executed by the SQL unlisten command.
   */
  void
! Async_Unlisten(const char *relname)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
  
  	/* If we couldn't possibly be listening, no need to queue anything */
  	if (pendingActions == NIL && !unlistenExitRegistered)
  		return;
  
! 	queue_listen(LISTEN_UNLISTEN, relname);
  }
  
  /*
--- 504,519 ----
   *		This is executed by the SQL unlisten command.
   */
  void
! Async_Unlisten(const char *channel)
  {
  	if (Trace_notify)
! 		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
  
  	/* If we couldn't possibly be listening, no need to queue anything */
  	if (pendingActions == NIL && !unlistenExitRegistered)
  		return;
  
! 	queue_listen(LISTEN_UNLISTEN, channel);
  }
  
  /*
***************
*** 306,313 ****
  /*
   * Async_UnlistenOnExit
   *
-  *		Clean up the pg_listener table at backend exit.
-  *
   *		This is executed if we have done any LISTENs in this backend.
   *		It might not be necessary anymore, if the user UNLISTENed everything,
   *		but we don't try to detect that case.
--- 537,542 ----
***************
*** 315,331 ****
  static void
  Async_UnlistenOnExit(int code, Datum arg)
  {
- 	/*
- 	 * We need to start/commit a transaction for the unlisten, but if there is
- 	 * already an active transaction we had better abort that one first.
- 	 * Otherwise we'd end up committing changes that probably ought to be
- 	 * discarded.
- 	 */
  	AbortOutOfAnyTransaction();
! 	/* Now we can do the unlisten */
! 	StartTransactionCommand();
! 	Async_UnlistenAll();
! 	CommitTransactionCommand();
  }
  
  /*
--- 544,551 ----
  static void
  Async_UnlistenOnExit(int code, Datum arg)
  {
  	AbortOutOfAnyTransaction();
! 	Exec_UnlistenAll();
  }
  
  /*
***************
*** 348,357 ****
  	/* We can deal with pending NOTIFY though */
  	foreach(p, pendingNotifies)
  	{
! 		const char *relname = (const char *) lfirst(p);
  
  		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
! 							   relname, strlen(relname) + 1);
  	}
  
  	/*
--- 568,582 ----
  	/* We can deal with pending NOTIFY though */
  	foreach(p, pendingNotifies)
  	{
! 		AsyncQueueEntry qe;
! 		Notification   *n;
! 
! 		n = (Notification *) lfirst(p);
! 
! 		asyncQueueNotificationToEntry(n, &qe);
  
  		RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
! 							   &qe, sizeof(AsyncQueueEntry));
  	}
  
  	/*
***************
*** 367,386 ****
   *
   *		This is called at transaction commit.
   *
!  *		If there are pending LISTEN/UNLISTEN actions, insert or delete
!  *		tuples in pg_listener accordingly.
   *
!  *		If there are outbound notify requests in the pendingNotifies list,
!  *		scan pg_listener for matching tuples, and either signal the other
!  *		backend or send a message to our own frontend.
!  *
!  *		NOTE: we are still inside the current transaction, therefore can
!  *		piggyback on its committing of changes.
   */
  void
  AtCommit_Notify(void)
  {
- 	Relation	lRel;
  	ListCell   *p;
  
  	if (pendingActions == NIL && pendingNotifies == NIL)
--- 592,606 ----
   *
   *		This is called at transaction commit.
   *
!  *		If there are pending LISTEN/UNLISTEN actions, update our
!  *		"listenChannels" list.
   *
!  *		If there are outbound notify requests in the pendingNotifies list, add
!  *		them to the global queue and signal any backend that is listening.
   */
  void
  AtCommit_Notify(void)
  {
  	ListCell   *p;
  
  	if (pendingActions == NIL && pendingNotifies == NIL)
***************
*** 399,407 ****
  	if (Trace_notify)
  		elog(DEBUG1, "AtCommit_Notify");
  
- 	/* Acquire ExclusiveLock on pg_listener */
- 	lRel = heap_open(ListenerRelationId, ExclusiveLock);
- 
  	/* Perform any pending listen/unlisten actions */
  	foreach(p, pendingActions)
  	{
--- 619,624 ----
***************
*** 410,442 ****
  		switch (actrec->action)
  		{
  			case LISTEN_LISTEN:
! 				Exec_Listen(lRel, actrec->condname);
  				break;
  			case LISTEN_UNLISTEN:
! 				Exec_Unlisten(lRel, actrec->condname);
  				break;
  			case LISTEN_UNLISTEN_ALL:
! 				Exec_UnlistenAll(lRel);
  				break;
  		}
- 
- 		/* We must CCI after each action in case of conflicting actions */
- 		CommandCounterIncrement();
  	}
  
- 	/* Perform any pending notifies */
- 	if (pendingNotifies)
- 		Send_Notify(lRel);
- 
  	/*
! 	 * We do NOT release the lock on pg_listener here; we need to hold it
! 	 * until end of transaction (which is about to happen, anyway) to ensure
! 	 * that notified backends see our tuple updates when they look. Else they
! 	 * might disregard the signal, which would make the application programmer
! 	 * very unhappy.  Also, this prevents race conditions when we have just
! 	 * inserted a listening tuple.
  	 */
! 	heap_close(lRel, NoLock);
  
  	ClearPendingActionsAndNotifies();
  
--- 627,649 ----
  		switch (actrec->action)
  		{
  			case LISTEN_LISTEN:
! 				Exec_Listen(actrec->condname);
  				break;
  			case LISTEN_UNLISTEN:
! 				Exec_Unlisten(actrec->condname);
  				break;
  			case LISTEN_UNLISTEN_ALL:
! 				Exec_UnlistenAll();
  				break;
  		}
  	}
  
  	/*
! 	 * Perform any pending notifies, note that whatever we add to the queue is
! 	 * immediately visible to any other process.
  	 */
! 	if (pendingNotifies)
! 		Send_Notify();
  
  	ClearPendingActionsAndNotifies();
  
***************
*** 445,508 ****
  }
  
  /*
   * Exec_Listen --- subroutine for AtCommit_Notify
   *
!  *		Register the current backend as listening on the specified relation.
   */
  static void
! Exec_Listen(Relation lRel, const char *relname)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	tuple;
! 	Datum		values[Natts_pg_listener];
! 	bool		nulls[Natts_pg_listener];
! 	NameData	condname;
! 	bool		alreadyListener = false;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
! 
! 	/* Detect whether we are already listening on this relname */
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
! 
! 		if (listener->listenerpid == MyProcPid &&
! 			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
! 		{
! 			alreadyListener = true;
! 			/* No need to scan the rest of the table */
! 			break;
! 		}
! 	}
! 	heap_endscan(scan);
  
! 	if (alreadyListener)
  		return;
  
  	/*
! 	 * OK to insert a new tuple
  	 */
- 	memset(nulls, false, sizeof(nulls));
- 
- 	namestrcpy(&condname, relname);
- 	values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
- 	values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid);
- 	values[Anum_pg_listener_notification - 1] = Int32GetDatum(0);		/* no notifies pending */
- 
- 	tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls);
- 
- 	simple_heap_insert(lRel, tuple);
  
! #ifdef NOT_USED					/* currently there are no indexes */
! 	CatalogUpdateIndexes(lRel, tuple);
! #endif
  
! 	heap_freetuple(tuple);
  
  	/*
! 	 * now that we are listening, make sure we will unlisten before dying.
  	 */
  	if (!unlistenExitRegistered)
  	{
--- 652,722 ----
  }
  
  /*
+  * This function is executed for every notification found in the queue in order
+  * to check if the current backend is listening on that channel. Not sure if we
+  * should further optimize this, for example convert to a sorted array and
+  * allow binary search on it...
+  */
+ static bool
+ IsListeningOn(const char *channel)
+ {
+ 	ListCell   *p;
+ 
+ 	foreach(p, listenChannels)
+ 	{
+ 		char *lchan = (char *) lfirst(p);
+ 		if (strcmp(lchan, channel) == 0)
+ 			/* already listening on this channel */
+ 			return true;
+ 	}
+ 	return false;
+ }
+ 
+ 
+ /*
   * Exec_Listen --- subroutine for AtCommit_Notify
   *
!  *		Register the current backend as listening on the specified channel.
   */
  static void
! Exec_Listen(const char *channel)
  {
! 	MemoryContext oldcontext;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid);
  
! 	/* Detect whether we are already listening on this channel */
! 	if (IsListeningOn(channel))
  		return;
  
  	/*
! 	 * OK to insert to the list.
  	 */
  
! 	if (listenChannels == NIL)
! 	{
! 		/*
! 		 * This is our first LISTEN, establish our pointer.
! 		 */
! 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 		QUEUE_BACKEND_POS(MyBackendId) = QUEUE_HEAD;
! 		QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
! 		LWLockRelease(AsyncQueueLock);
! 		/*
! 		 * Actually this is only necessary if we are the first listener
! 		 * (The tail pointer needs to be identical with the pointer of at
! 		 * least one backend).
! 		 */
! 		asyncQueueAdvanceTail();
! 	}
  
! 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
! 	listenChannels = lappend(listenChannels, pstrdup(channel));
! 	MemoryContextSwitchTo(oldcontext);
  
  	/*
! 	 * Now that we are listening, make sure we will unlisten before dying.
  	 */
  	if (!unlistenExitRegistered)
  	{
***************
*** 514,551 ****
  /*
   * Exec_Unlisten --- subroutine for AtCommit_Notify
   *
!  *		Remove the current backend from the list of listening backends
!  *		for the specified relation.
   */
  static void
! Exec_Unlisten(Relation lRel, const char *relname)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	tuple;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
  
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
  	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
! 
! 		if (listener->listenerpid == MyProcPid &&
! 			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
  		{
- 			/* Found the matching tuple, delete it */
- 			simple_heap_delete(lRel, &tuple->t_self);
- 
  			/*
! 			 * We assume there can be only one match, so no need to scan the
! 			 * rest of the table
  			 */
! 			break;
  		}
  	}
! 	heap_endscan(scan);
! 
  	/*
  	 * We do not complain about unlistening something not being listened;
  	 * should we?
--- 728,780 ----
  /*
   * Exec_Unlisten --- subroutine for AtCommit_Notify
   *
!  *		Remove a specified channel from "listenChannel".
   */
  static void
! Exec_Unlisten(const char *channel)
  {
! 	ListCell   *p;
! 	ListCell   *prev = NULL;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_Unlisten(%s,%d)", channel, MyProcPid);
  
! 	/* Detect whether we are already listening on this channel */
! 	foreach(p, listenChannels)
  	{
! 		char *lchan = (char *) lfirst(p);
! 		if (strcmp(lchan, channel) == 0)
  		{
  			/*
! 			 * Since the list is living in the TopMemoryContext, we free
! 			 * the memory. The ListCell is freed by list_delete_cell().
  			 */
! 			pfree(lchan);
! 			listenChannels = list_delete_cell(listenChannels, p, prev);
! 			if (listenChannels == NIL)
! 			{
! 				bool advanceTail = false;
! 				/*
! 				 * This backend is not listening anymore.
! 				 */
! 				LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 				QUEUE_BACKEND_PID(MyBackendId) = INVALID_PID;
! 
! 				/*
! 				 * If we have been the last backend, advance the tail pointer.
! 				 */
! 				if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
! 					advanceTail = true;
! 				LWLockRelease(AsyncQueueLock);
! 
! 				if (advanceTail)
! 					asyncQueueAdvanceTail();
! 			}
! 			return;
  		}
+ 		prev = p;
  	}
! 	
  	/*
  	 * We do not complain about unlistening something not being listened;
  	 * should we?
***************
*** 555,677 ****
  /*
   * Exec_UnlistenAll --- subroutine for AtCommit_Notify
   *
!  *		Update pg_listener to unlisten all relations for this backend.
   */
  static void
! Exec_UnlistenAll(Relation lRel)
  {
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple;
! 	ScanKeyData key[1];
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_UnlistenAll");
  
! 	/* Find and delete all entries with my listenerPID */
! 	ScanKeyInit(&key[0],
! 				Anum_pg_listener_listenerpid,
! 				BTEqualStrategyNumber, F_INT4EQ,
! 				Int32GetDatum(MyProcPid));
! 	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
  
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 		simple_heap_delete(lRel, &lTuple->t_self);
  
! 	heap_endscan(scan);
  }
  
  /*
!  * Send_Notify --- subroutine for AtCommit_Notify
!  *
!  *		Scan pg_listener for tuples matching our pending notifies, and
!  *		either signal the other backend or send a message to our own frontend.
   */
  static void
! Send_Notify(Relation lRel)
  {
! 	TupleDesc	tdesc = RelationGetDescr(lRel);
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple,
! 				rTuple;
! 	Datum		value[Natts_pg_listener];
! 	bool		repl[Natts_pg_listener],
! 				nulls[Natts_pg_listener];
! 
! 	/* preset data to update notify column to MyProcPid */
! 	memset(nulls, false, sizeof(nulls));
! 	memset(repl, false, sizeof(repl));
! 	repl[Anum_pg_listener_notification - 1] = true;
! 	memset(value, 0, sizeof(value));
! 	value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid);
! 
! 	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! 
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! 		char	   *relname = NameStr(listener->relname);
! 		int32		listenerPID = listener->listenerpid;
  
! 		if (!AsyncExistsPendingNotify(relname))
! 			continue;
  
! 		if (listenerPID == MyProcPid)
! 		{
! 			/*
! 			 * Self-notify: no need to bother with table update. Indeed, we
! 			 * *must not* clear the notification field in this path, or we
! 			 * could lose an outside notify, which'd be bad for applications
! 			 * that ignore self-notify messages.
! 			 */
! 			if (Trace_notify)
! 				elog(DEBUG1, "AtCommit_Notify: notifying self");
  
! 			NotifyMyFrontEnd(relname, listenerPID);
! 		}
! 		else
  		{
! 			if (Trace_notify)
! 				elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
! 					 listenerPID);
  
! 			/*
! 			 * If someone has already notified this listener, we don't bother
! 			 * modifying the table, but we do still send a NOTIFY_INTERRUPT
! 			 * signal, just in case that backend missed the earlier signal for
! 			 * some reason.  It's OK to send the signal first, because the
! 			 * other guy can't read pg_listener until we unlock it.
! 			 *
! 			 * Note: we don't have the other guy's BackendId available, so
! 			 * this will incur a search of the ProcSignal table.  That's
! 			 * probably not worth worrying about.
! 			 */
! 			if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT,
! 							   InvalidBackendId) < 0)
! 			{
! 				/*
! 				 * Get rid of pg_listener entry if it refers to a PID that no
! 				 * longer exists.  Presumably, that backend crashed without
! 				 * deleting its pg_listener entries. This code used to only
! 				 * delete the entry if errno==ESRCH, but as far as I can see
! 				 * we should just do it for any failure (certainly at least
! 				 * for EPERM too...)
! 				 */
! 				simple_heap_delete(lRel, &lTuple->t_self);
! 			}
! 			else if (listener->notification == 0)
! 			{
! 				/* Rewrite the tuple with my PID in notification column */
! 				rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! 				simple_heap_update(lRel, &lTuple->t_self, rTuple);
! 
! #ifdef NOT_USED					/* currently there are no indexes */
! 				CatalogUpdateIndexes(lRel, rTuple);
! #endif
! 			}
  		}
  	}
  
! 	heap_endscan(scan);
  }
  
  /*
--- 784,1028 ----
  /*
   * Exec_UnlistenAll --- subroutine for AtCommit_Notify
   *
!  *		Unlisten on all channels for this backend.
   */
  static void
! Exec_UnlistenAll(void)
  {
! 	bool advanceTail = false;
  
  	if (Trace_notify)
! 		elog(DEBUG1, "Exec_UnlistenAll(%d)", MyProcPid);
! 
! 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 	QUEUE_BACKEND_PID(MyBackendId) = INVALID_PID;
! 
! 	/*
! 	 * Since the list is living in the TopMemoryContext, we free the memory.
! 	 */
! 	list_free_deep(listenChannels);
! 	listenChannels = NIL;
! 
! 	/*
! 	 * If we have been the last backend, advance the tail pointer.
! 	 */
! 	if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
! 		advanceTail = true;
! 	LWLockRelease(AsyncQueueLock);
! 
! 	if (advanceTail)
! 		asyncQueueAdvanceTail();
! }
  
! static bool
! asyncQueueIsFull()
! {
! 	QueuePosition	lookahead = QUEUE_HEAD;
  
! 	/*
! 	 * Check what happens if we wrote one entry. Would we go to a new page? If
! 	 * not, then our queue can not be full (because we can still fill at least
! 	 * the current page with at least one more entry).
! 	 */
! 	if (!asyncQueueAdvance(&lookahead))
! 		return false;
  
! 	/*
! 	 * The queue is full if with a switch to a new page we reach the page
! 	 * of the tail pointer.
! 	 */
! 	return QUEUE_POS_PAGE(lookahead) == QUEUE_POS_PAGE(QUEUE_TAIL);
  }
  
  /*
!  * The function advances the position to the next entry. In case we jump to
!  * a new page the function returns true, else false.
   */
+ static bool
+ asyncQueueAdvance(QueuePosition *position)
+ {
+ 	int		pageno = QUEUE_POS_PAGE(*position);
+ 	int		offset = QUEUE_POS_OFFSET(*position);
+ 	bool	pageJump = false;
+ 
+ 	/*
+ 	 * Move to the next writing position: First jump over what we have just
+ 	 * written or read.
+ 	 */
+ 	offset += sizeof(AsyncQueueEntry);
+ 	Assert(offset < QUEUE_PAGESIZE);
+ 
+ 	/*
+ 	 * In a second step check if another entry can be written to the page. If
+ 	 * it does, stay here, we have reached the next position. If not, then we
+ 	 * need to move on to the next page.
+ 	 */
+ 	if (offset + sizeof(AsyncQueueEntry) >= QUEUE_PAGESIZE)
+ 	{
+ 		pageno++;
+ 		if (pageno > QUEUE_MAX_PAGE)
+ 			/* wrap around */
+ 			pageno = 0;
+ 		offset = 0;
+ 		pageJump = true;
+ 	}
+ 
+ 	SET_QUEUE_POS(*position, pageno, offset);
+ 	return pageJump;
+ }
+ 
  static void
! asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
  {
! 		Assert(n->channel);
! 		Assert(n->payload);
  
! 		qe->srcPid = MyProcPid;
! 		qe->dboid = MyDatabaseId;
! 		strcpy(qe->channel, n->channel);
! 		strcpy(qe->payload, n->payload);
! }
  
! static List *
! asyncQueueAddEntries(List *notifications)
! {
! 	int			pageno;
! 	int			offset;
! 	int			slotno;
  
! 	/*
! 	 * Note that we are holding exclusive AsyncQueueLock already.
! 	 */
! 	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
! 	pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
! 	slotno = SimpleLruReadPage(AsyncCtl, pageno,
! 							   true, InvalidTransactionId);
! 	AsyncCtl->shared->page_dirty[slotno] = true;
! 
! 	do
! 	{
! 		AsyncQueueEntry	qe;
! 		Notification   *n;
! 
! 		if (asyncQueueIsFull())
  		{
! 			/* document that we will not go into the if command further down */
! 			Assert(QUEUE_POS_OFFSET(QUEUE_HEAD) != 0);
! 			break;
! 		}
  
! 		n = (Notification *) linitial(notifications);
! 
! 		asyncQueueNotificationToEntry(n, &qe);
! 
! 		offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
! 		Assert(offset + sizeof(AsyncQueueEntry) < QUEUE_PAGESIZE);
! 		memcpy((char*) AsyncCtl->shared->page_buffer[slotno] + offset,
! 			   &qe, sizeof(AsyncQueueEntry));
! 
! 		notifications = list_delete_first(notifications);
! 
! 	} while (!asyncQueueAdvance(&(QUEUE_HEAD)) && notifications != NIL);
! 
! 	if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
! 	{
! 		/*
! 		 * If the next entry needs to go to a new page, prepare that page
! 		 * already.
! 		 */
! 		slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
! 		AsyncCtl->shared->page_dirty[slotno] = true;
! 		SimpleLruWritePage(AsyncCtl, slotno, NULL);
! 	}
! 	LWLockRelease(AsyncCtlLock);
! 
! 	return notifications;
! }
! 
! static void
! asyncQueueFullWarning()
! {
! 	/*
! 	 * Caller must hold exclusive AsyncQueueLock.
! 	 */
! 	TimestampTz t = GetCurrentTimestamp();
! 	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFullWarn,
! 								   t, QUEUE_FULL_WARN_INTERVAL))
! 	{
! 		ereport(WARNING, (errmsg("pg_notify ring buffer is full.")));
! 		asyncQueueControl->lastQueueFullWarn = t;
! 	}
! }
! 
! /*
!  * Send_Notify --- subroutine for AtCommit_Notify
!  *
!  * Add the pending notifications to the queue and signal the listening
!  * backends.
!  *
!  * A full queue is very uncommon and should really not happen, given that we
!  * have so much space available in our slru pages. Nevertheless we need to
!  * deal with this possibility. Note that when we get here we are in the process
!  * of committing our transaction, we have already committed to clog but we have
!  * not yet released all of our resources, and further block them if we block
!  * here...
!  */
! static void
! Send_Notify()
! {
! 	while (pendingNotifies != NIL)
! 	{
! 		LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 		while (asyncQueueIsFull())
! 		{
! 			asyncQueueFullWarning();
! 			LWLockRelease(AsyncQueueLock);
! 			SignalBackends();
! 			ProcessIncomingNotify();
! 			asyncQueueAdvanceTail();
! 			pg_usleep(100 * 1000L); /* 100ms */
! 			LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
  		}
+ 		Assert(pendingNotifies != NIL);
+ 		pendingNotifies = asyncQueueAddEntries(pendingNotifies);
+ 		LWLockRelease(AsyncQueueLock);
  	}
+ 	SignalBackends();
+ }
+ 
  
! static void
! SignalBackends(void)
! {
! 	ListCell	   *p1, *p2;
! 	int				i;
! 	int32			pid;
! 	List		   *pids = NIL;
! 	List		   *ids = NIL;
! 
! 	/* Signal everybody who is LISTENing to any channel*/
! 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! 	for (i = 0; i < MaxBackends; i++)
! 	{
! 		pid = QUEUE_BACKEND_PID(i);
! 		if (pid != INVALID_PID)
! 		{
! 			pids = lappend_int(pids, pid);
! 			ids = lappend_int(ids, i);
! 		}
! 	}
! 	LWLockRelease(AsyncQueueLock);
! 	
! 	forboth(p1, pids, p2, ids)
! 	{
! 		pid = (int32) lfirst_int(p1);
! 		i = lfirst_int(p2);
! 		/* XXX
! 		 * Should we check for failure? Can it happen that a backend
! 		 * has crashed without the postmaster starting over?
! 		 */
! 		SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, i);
! 	}
  }
  
  /*
***************
*** 940,968 ****
  }
  
  /*
   * ProcessIncomingNotify
   *
   *		Deal with arriving NOTIFYs from other backends.
   *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
   *		signal handler, or the next time control reaches the outer idle loop.
!  *		Scan pg_listener for arriving notifies, report them to my front end,
!  *		and clear the notification field in pg_listener until next time.
   *
!  *		NOTE: since we are outside any transaction, we must create our own.
   */
  static void
  ProcessIncomingNotify(void)
  {
! 	Relation	lRel;
! 	TupleDesc	tdesc;
! 	ScanKeyData key[1];
! 	HeapScanDesc scan;
! 	HeapTuple	lTuple,
! 				rTuple;
! 	Datum		value[Natts_pg_listener];
! 	bool		repl[Natts_pg_listener],
! 				nulls[Natts_pg_listener];
! 	bool		catchup_enabled;
  
  	/* Must prevent catchup interrupt while I am running */
  	catchup_enabled = DisableCatchupInterrupt();
--- 1291,1398 ----
  }
  
  /*
+  * This function will ask for a page with ReadOnly access and once we have the
+  * lock already, we read the whole content and pass back a list of
+  * Notifications that the calling function will deliver then.
+  *
+  * We stop if we have either reached the stop position or go to a new page.
+  */
+ static List *
+ asyncQueueGetEntriesByPage(QueuePosition *current, QueuePosition stop)
+ {
+ 	int				slotno;
+ 	AsyncQueueEntry	qe;
+ 	List	  	   *notifications = NIL;
+ 	Notification   *n;
+ 
+ 	if (QUEUE_POS_EQUAL(*current, stop))
+ 		return NIL;
+ 
+ 	slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, current->page,
+ 										InvalidTransactionId);
+ 	do {
+ 		memcpy(&qe, 
+ 			   (char *) (AsyncCtl->shared->page_buffer[slotno]) + current->offset,
+ 			   sizeof(AsyncQueueEntry));
+ 
+ 		if (qe.dboid == MyDatabaseId && IsListeningOn(qe.channel))
+ 		{
+ 			n = (Notification *) palloc(sizeof(Notification));
+ 			n->channel = pstrdup(qe.channel);
+ 			n->payload = pstrdup(qe.payload);
+ 			n->srcPid = qe.srcPid;
+         
+ 			notifications = lappend(notifications, n);
+ 		}
+ 
+ 		/*
+ 		 * Actually the call to asyncQueueAdvance just jumps over what we have
+ 		 * just read. If there is no more space for the next record on the
+ 		 * current page, it will also go to the beginning of the next page.
+ 		 */
+ 	} while(!asyncQueueAdvance(current) && !QUEUE_POS_EQUAL(*current, stop));
+ 
+ 	LWLockRelease(AsyncCtlLock);
+ 
+ 	return notifications;
+ }
+ 
+ static void
+ asyncQueueAdvanceTail()
+ {
+ 	QueuePosition	min;
+ 	int				i;
+ 	int				tailPage;
+ 	int				headPage;
+ 
+ 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ 	min = QUEUE_HEAD;
+ 	for (i = 0; i < MaxBackends; i++)
+ 		if (QUEUE_BACKEND_PID(i) != INVALID_PID)
+ 			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
+ 
+ 	tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
+ 	headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
+ 	QUEUE_TAIL = min;
+ 	LWLockRelease(AsyncQueueLock);
+ 
+ 	if (QUEUE_POS_PAGE(min) == tailPage)
+ 		return;
+ 
+ 	/* This is our wraparound check */
+ 	if (AsyncPagePrecedesLogically(tailPage, QUEUE_POS_PAGE(min), headPage)
+ 		&& AsyncPagePrecedesPhysically(tailPage, headPage))
+ 	{
+ 		/*
+ 		 * SimpleLruTruncate() will ask for AsyncCtlLock but will also
+ 		 * release the lock again.
+ 		 */
+ 		SimpleLruTruncate(AsyncCtl, QUEUE_POS_PAGE(min));
+ 	}
+ }
+ 
+ /*
   * ProcessIncomingNotify
   *
   *		Deal with arriving NOTIFYs from other backends.
   *		This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
   *		signal handler, or the next time control reaches the outer idle loop.
!  *		Scan the queue for arriving notifications and report them to my front
!  *		end.
   *
!  *		NOTE: we are outside of any transaction here.
   */
  static void
  ProcessIncomingNotify(void)
  {
! 	QueuePosition	pos;
! 	QueuePosition	oldpos;
! 	QueuePosition	head;
! 	List		   *notifications;
! 	bool			catchup_enabled;
! 	bool			advanceTail = false;
! 
! 	Assert(GetCurrentTransactionIdIfAny() == InvalidTransactionId);
  
  	/* Must prevent catchup interrupt while I am running */
  	catchup_enabled = DisableCatchupInterrupt();
***************
*** 974,1037 ****
  
  	notifyInterruptOccurred = 0;
  
! 	StartTransactionCommand();
  
! 	lRel = heap_open(ListenerRelationId, ExclusiveLock);
! 	tdesc = RelationGetDescr(lRel);
  
! 	/* Scan only entries with my listenerPID */
! 	ScanKeyInit(&key[0],
! 				Anum_pg_listener_listenerpid,
! 				BTEqualStrategyNumber, F_INT4EQ,
! 				Int32GetDatum(MyProcPid));
! 	scan = heap_beginscan(lRel, SnapshotNow, 1, key);
! 
! 	/* Prepare data for rewriting 0 into notification field */
! 	memset(nulls, false, sizeof(nulls));
! 	memset(repl, false, sizeof(repl));
! 	repl[Anum_pg_listener_notification - 1] = true;
! 	memset(value, 0, sizeof(value));
! 	value[Anum_pg_listener_notification - 1] = Int32GetDatum(0);
! 
! 	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! 	{
! 		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! 		char	   *relname = NameStr(listener->relname);
! 		int32		sourcePID = listener->notification;
  
! 		if (sourcePID != 0)
  		{
! 			/* Notify the frontend */
! 
! 			if (Trace_notify)
! 				elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
! 					 relname, (int) sourcePID);
! 
! 			NotifyMyFrontEnd(relname, sourcePID);
! 
! 			/*
! 			 * Rewrite the tuple with 0 in notification column.
! 			 */
! 			rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! 			simple_heap_update(lRel, &lTuple->t_self, rTuple);
! 
! #ifdef NOT_USED					/* currently there are no indexes */
! 			CatalogUpdateIndexes(lRel, rTuple);
! #endif
  		}
! 	}
! 	heap_endscan(scan);
! 
! 	/*
! 	 * We do NOT release the lock on pg_listener here; we need to hold it
! 	 * until end of transaction (which is about to happen, anyway) to ensure
! 	 * that other backends see our tuple updates when they look. Otherwise, a
! 	 * transaction started after this one might mistakenly think it doesn't
! 	 * need to send this backend a new NOTIFY.
! 	 */
! 	heap_close(lRel, NoLock);
  
! 	CommitTransactionCommand();
  
  	/*
  	 * Must flush the notify messages to ensure frontend gets them promptly.
--- 1404,1452 ----
  
  	notifyInterruptOccurred = 0;
  
! 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 	pos = QUEUE_BACKEND_POS(MyBackendId);
! 	oldpos = QUEUE_BACKEND_POS(MyBackendId);
! 	head = QUEUE_HEAD;
! 	LWLockRelease(AsyncQueueLock);
  
! 	/* Nothing to do, we have read all notifications already. */
! 	if (QUEUE_POS_EQUAL(pos, head))
! 		return;
  
! 	do 
! 	{
! 		ListCell	   *lc;
! 		Notification   *n;
  
! 		/*
! 		 * Our stop position is what we found to be the head's position when
! 		 * we entered this function. It might have changed already. But if it
! 		 * has, we will receive (or have already received and queued) another
! 		 * signal and come here again.
! 		 *
! 		 * We are not holding AsyncQueueLock here! The queue can only extend
! 		 * beyond the head pointer (see above) and we leave our backend's
! 		 * pointer where it is so nobody will truncate or rewrite pages under
! 		 * us.
! 		 */
! 		notifications = asyncQueueGetEntriesByPage(&pos, head);
! 		foreach(lc, notifications)
  		{
! 			n = (Notification *) lfirst(lc);
! 			NotifyMyFrontEnd(n->channel, n->payload, n->srcPid);
  		}
! 	} while (!QUEUE_POS_EQUAL(pos, head));
  
! 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 	QUEUE_BACKEND_POS(MyBackendId) = pos;
! 	if (QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL))
! 		advanceTail = true;
! 	LWLockRelease(AsyncQueueLock);
! 
! 	if (advanceTail)
! 		/* Move forward the tail pointer and try to truncate. */
! 		asyncQueueAdvanceTail();
  
  	/*
  	 * Must flush the notify messages to ensure frontend gets them promptly.
***************
*** 1051,1070 ****
   * Send NOTIFY message to my front end.
   */
  static void
! NotifyMyFrontEnd(char *relname, int32 listenerPID)
  {
  	if (whereToSendOutput == DestRemote)
  	{
  		StringInfoData buf;
  
  		pq_beginmessage(&buf, 'A');
! 		pq_sendint(&buf, listenerPID, sizeof(int32));
! 		pq_sendstring(&buf, relname);
  		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! 		{
! 			/* XXX Add parameter string here later */
! 			pq_sendstring(&buf, "");
! 		}
  		pq_endmessage(&buf);
  
  		/*
--- 1466,1482 ----
   * Send NOTIFY message to my front end.
   */
  static void
! NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
  {
  	if (whereToSendOutput == DestRemote)
  	{
  		StringInfoData buf;
  
  		pq_beginmessage(&buf, 'A');
! 		pq_sendint(&buf, srcPid, sizeof(int32));
! 		pq_sendstring(&buf, channel);
  		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! 			pq_sendstring(&buf, payload);
  		pq_endmessage(&buf);
  
  		/*
***************
*** 1074,1096 ****
  		 */
  	}
  	else
! 		elog(INFO, "NOTIFY for %s", relname);
  }
  
  /* Does pendingNotifies include the given relname? */
  static bool
! AsyncExistsPendingNotify(const char *relname)
  {
  	ListCell   *p;
  
! 	foreach(p, pendingNotifies)
! 	{
! 		const char *prelname = (const char *) lfirst(p);
  
! 		if (strcmp(prelname, relname) == 0)
  			return true;
  	}
  
  	return false;
  }
  
--- 1486,1538 ----
  		 */
  	}
  	else
! 		elog(INFO, "NOTIFY for %s", channel);
  }
  
  /* Does pendingNotifies include the given relname? */
  static bool
! AsyncExistsPendingNotify(const char *channel, const char *payload)
  {
  	ListCell   *p;
+ 	Notification *n;
  
! 	if (pendingNotifies == NIL)
! 		return false;
! 
! 	if (payload == NULL)
! 		payload = "";
! 
! 	/*
! 	 * We need to append new elements to the end of the list in order to keep
! 	 * the order. However, on the other hand we'd like to check the list
! 	 * backwards in order to make duplicate-elimination a tad faster when the
! 	 * same condition is signaled many times in a row. So as a compromise we
! 	 * check the tail element first which we can access directly. If this
! 	 * doesn't match, we check the rest of the list.
! 	 */
  
! 	n = (Notification *) llast(pendingNotifies);
! 	if (strcmp(n->channel, channel) == 0)
! 	{
! 		Assert(n->payload != NULL);
! 		if (strcmp(n->payload, payload) == 0)
  			return true;
  	}
  
+ 	for(p = list_head(pendingNotifies);
+ 		p != list_tail(pendingNotifies);
+ 		p = lnext(p))
+ 	{
+ 		n = (Notification *) lfirst(p);
+ 
+ 		if (strcmp(n->channel, channel) == 0)
+ 		{
+ 			Assert(n->payload != NULL);
+ 			if (strcmp(n->payload, payload) == 0)
+ 				return true;
+ 		}
+ 	}
+ 
  	return false;
  }
  
***************
*** 1124,1128 ****
  	 * there is any significant delay before I commit.	OK for now because we
  	 * disallow COMMIT PREPARED inside a transaction block.)
  	 */
! 	Async_Notify((char *) recdata);
  }
--- 1566,1576 ----
  	 * there is any significant delay before I commit.	OK for now because we
  	 * disallow COMMIT PREPARED inside a transaction block.)
  	 */
! 	AsyncQueueEntry		*qe = (AsyncQueueEntry *) recdata;
! 
! 	Assert(len == sizeof(AsyncQueueEntry));
! 	Assert(qe->dboid == MyDatabaseId);
! 
! 	Async_Notify(qe->channel, qe->payload);
  }
+ 
diff -cr cvs/src/backend/nodes/copyfuncs.c cvs.build/src/backend/nodes/copyfuncs.c
*** cvs/src/backend/nodes/copyfuncs.c	2009-10-31 14:47:48.000000000 +0100
--- cvs.build/src/backend/nodes/copyfuncs.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 2761,2766 ****
--- 2761,2767 ----
  	NotifyStmt *newnode = makeNode(NotifyStmt);
  
  	COPY_STRING_FIELD(conditionname);
+ 	COPY_STRING_FIELD(payload);
  
  	return newnode;
  }
diff -cr cvs/src/backend/nodes/equalfuncs.c cvs.build/src/backend/nodes/equalfuncs.c
*** cvs/src/backend/nodes/equalfuncs.c	2009-10-31 14:47:48.000000000 +0100
--- cvs.build/src/backend/nodes/equalfuncs.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 1321,1326 ****
--- 1321,1327 ----
  _equalNotifyStmt(NotifyStmt *a, NotifyStmt *b)
  {
  	COMPARE_STRING_FIELD(conditionname);
+ 	COMPARE_STRING_FIELD(payload);
  
  	return true;
  }
diff -cr cvs/src/backend/nodes/outfuncs.c cvs.build/src/backend/nodes/outfuncs.c
*** cvs/src/backend/nodes/outfuncs.c	2009-10-31 14:47:48.000000000 +0100
--- cvs.build/src/backend/nodes/outfuncs.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 1810,1815 ****
--- 1810,1816 ----
  	WRITE_NODE_TYPE("NOTIFY");
  
  	WRITE_STRING_FIELD(conditionname);
+ 	WRITE_STRING_FIELD(payload);
  }
  
  static void
diff -cr cvs/src/backend/nodes/readfuncs.c cvs.build/src/backend/nodes/readfuncs.c
*** cvs/src/backend/nodes/readfuncs.c	2009-10-31 14:47:48.000000000 +0100
--- cvs.build/src/backend/nodes/readfuncs.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 231,236 ****
--- 231,237 ----
  	READ_LOCALS(NotifyStmt);
  
  	READ_STRING_FIELD(conditionname);
+ 	READ_STRING_FIELD(payload);
  
  	READ_DONE();
  }
diff -cr cvs/src/backend/parser/gram.y cvs.build/src/backend/parser/gram.y
*** cvs/src/backend/parser/gram.y	2009-11-11 01:09:14.000000000 +0100
--- cvs.build/src/backend/parser/gram.y	2009-11-11 01:11:08.000000000 +0100
***************
*** 393,399 ****
  %type <boolean> opt_varying opt_timezone
  
  %type <ival>	Iconst SignedIconst
! %type <str>		Sconst comment_text
  %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst
  %type <list>	var_list
  %type <str>		ColId ColLabel var_name type_function_name param_name
--- 393,399 ----
  %type <boolean> opt_varying opt_timezone
  
  %type <ival>	Iconst SignedIconst
! %type <str>		Sconst comment_text notify_payload
  %type <str>		RoleId opt_granted_by opt_boolean ColId_or_Sconst
  %type <list>	var_list
  %type <str>		ColId ColLabel var_name type_function_name param_name
***************
*** 5977,5986 ****
   *
   *****************************************************************************/
  
! NotifyStmt: NOTIFY ColId
  				{
  					NotifyStmt *n = makeNode(NotifyStmt);
  					n->conditionname = $2;
  					$$ = (Node *)n;
  				}
  		;
--- 5977,5992 ----
   *
   *****************************************************************************/
  
! notify_payload:
! 			Sconst								{ $$ = $1; }
! 			| /*EMPTY*/							{ $$ = NULL; }
! 		;
! 
! NotifyStmt: NOTIFY ColId notify_payload
  				{
  					NotifyStmt *n = makeNode(NotifyStmt);
  					n->conditionname = $2;
+ 					n->payload = $3;
  					$$ = (Node *)n;
  				}
  		;
diff -cr cvs/src/backend/storage/ipc/ipci.c cvs.build/src/backend/storage/ipc/ipci.c
*** cvs/src/backend/storage/ipc/ipci.c	2009-09-06 09:06:21.000000000 +0200
--- cvs.build/src/backend/storage/ipc/ipci.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 219,224 ****
--- 219,225 ----
  	 */
  	BTreeShmemInit();
  	SyncScanShmemInit();
+ 	AsyncShmemInit();
  
  #ifdef EXEC_BACKEND
  
diff -cr cvs/src/backend/tcop/utility.c cvs.build/src/backend/tcop/utility.c
*** cvs/src/backend/tcop/utility.c	2009-10-31 14:47:55.000000000 +0100
--- cvs.build/src/backend/tcop/utility.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 875,882 ****
  		case T_NotifyStmt:
  			{
  				NotifyStmt *stmt = (NotifyStmt *) parsetree;
! 
! 				Async_Notify(stmt->conditionname);
  			}
  			break;
  
--- 875,886 ----
  		case T_NotifyStmt:
  			{
  				NotifyStmt *stmt = (NotifyStmt *) parsetree;
! 				if (stmt->payload
! 					&& strlen(stmt->payload) > NOTIFY_PAYLOAD_MAX_LENGTH - 1)
! 					ereport(ERROR,
! 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
! 							 errmsg("payload string too long")));
! 				Async_Notify(stmt->conditionname, stmt->payload);
  			}
  			break;
  
diff -cr cvs/src/bin/initdb/initdb.c cvs.build/src/bin/initdb/initdb.c
*** cvs/src/bin/initdb/initdb.c	2009-09-06 09:06:44.000000000 +0200
--- cvs.build/src/bin/initdb/initdb.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 2476,2481 ****
--- 2476,2482 ----
  		"pg_xlog",
  		"pg_xlog/archive_status",
  		"pg_clog",
+ 		"pg_notify",
  		"pg_subtrans",
  		"pg_twophase",
  		"pg_multixact/members",
diff -cr cvs/src/bin/psql/common.c cvs.build/src/bin/psql/common.c
*** cvs/src/bin/psql/common.c	2009-05-10 19:50:30.000000000 +0200
--- cvs.build/src/bin/psql/common.c	2009-11-11 01:11:08.000000000 +0100
***************
*** 555,562 ****
  
  	while ((notify = PQnotifies(pset.db)))
  	{
! 		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"),
! 				notify->relname, notify->be_pid);
  		fflush(pset.queryFout);
  		PQfreemem(notify);
  	}
--- 555,562 ----
  
  	while ((notify = PQnotifies(pset.db)))
  	{
! 		fprintf(pset.queryFout, _("Asynchronous notification \"%s\" (%s) received from server process with PID %d.\n"),
! 				notify->relname, notify->extra, notify->be_pid);
  		fflush(pset.queryFout);
  		PQfreemem(notify);
  	}
diff -cr cvs/src/include/access/slru.h cvs.build/src/include/access/slru.h
*** cvs/src/include/access/slru.h	2009-05-10 19:50:35.000000000 +0200
--- cvs.build/src/include/access/slru.h	2009-11-11 01:11:08.000000000 +0100
***************
*** 16,21 ****
--- 16,40 ----
  #include "access/xlogdefs.h"
  #include "storage/lwlock.h"
  
+ /*
+  * Define segment size.  A page is the same BLCKSZ as is used everywhere
+  * else in Postgres.  The segment size can be chosen somewhat arbitrarily;
+  * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
+  * or 64K transactions for SUBTRANS.
+  *
+  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+  * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
+  * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
+  * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need
+  * take no explicit notice of that fact in this module, except when comparing
+  * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
+  *
+  * Note: this file currently assumes that segment file names will be four
+  * hex digits.	This sets a lower bound on the segment size (64K transactions
+  * for 32-bit TransactionIds).
+  */
+ #define SLRU_PAGES_PER_SEGMENT	32
+ 
  
  /*
   * Page status codes.  Note that these do not include the "dirty" bit.
diff -cr cvs/src/include/commands/async.h cvs.build/src/include/commands/async.h
*** cvs/src/include/commands/async.h	2009-09-06 09:08:02.000000000 +0200
--- cvs.build/src/include/commands/async.h	2009-11-11 01:11:08.000000000 +0100
***************
*** 13,22 ****
  #ifndef ASYNC_H
  #define ASYNC_H
  
  extern bool Trace_notify;
  
  /* notify-related SQL statements */
! extern void Async_Notify(const char *relname);
  extern void Async_Listen(const char *relname);
  extern void Async_Unlisten(const char *relname);
  extern void Async_UnlistenAll(void);
--- 13,30 ----
  #ifndef ASYNC_H
  #define ASYNC_H
  
+ /*
+  * How long can a payload string possibly be? Actually it needs to be one
+  * byte less to provide space for the trailing terminating '\0'
+  */
+ #define NOTIFY_PAYLOAD_MAX_LENGTH (128)
+ 
  extern bool Trace_notify;
  
+ extern void AsyncShmemInit(void);
+ 
  /* notify-related SQL statements */
! extern void Async_Notify(const char *relname, const char *payload);
  extern void Async_Listen(const char *relname);
  extern void Async_Unlisten(const char *relname);
  extern void Async_UnlistenAll(void);
diff -cr cvs/src/include/nodes/parsenodes.h cvs.build/src/include/nodes/parsenodes.h
*** cvs/src/include/nodes/parsenodes.h	2009-11-11 01:09:15.000000000 +0100
--- cvs.build/src/include/nodes/parsenodes.h	2009-11-11 01:11:08.000000000 +0100
***************
*** 2059,2064 ****
--- 2059,2065 ----
  {
  	NodeTag		type;
  	char	   *conditionname;	/* condition name to notify */
+ 	char	   *payload;		/* the payload string to be conveyed */
  } NotifyStmt;
  
  /* ----------------------
diff -cr cvs/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h
*** cvs/src/include/storage/lwlock.h	2009-05-10 19:53:12.000000000 +0200
--- cvs.build/src/include/storage/lwlock.h	2009-11-11 01:11:08.000000000 +0100
***************
*** 67,72 ****
--- 67,74 ----
  	AutovacuumLock,
  	AutovacuumScheduleLock,
  	SyncScanLock,
+ 	AsyncCtlLock,
+ 	AsyncQueueLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,