diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index e0e125a..d7dcbea 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -94,9 +94,9 @@ NOTIFY channel [ ,
- If the same channel name is signaled multiple times from the same
- transaction with identical payload strings, the
- database server can decide to deliver a single notification only.
+ If the same channel name is signaled multiple times with identical
+ payload strings within the same transaction, only one instance of the
+ notification event is delivered to listeners.
On the other hand, notifications with distinct payload strings will
always be delivered as distinct notifications. Similarly, notifications from
different transactions will never get folded into one notification.
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6e9c580..3f5f054 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -135,6 +135,7 @@
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/hashutils.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
@@ -323,14 +324,25 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
/*
* State for outbound notifies consists of a list of all channels+payloads
- * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
- * until and unless the transaction commits. pendingNotifies is NIL if no
- * NOTIFYs have been done in the current transaction.
+ * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
+ * until and unless the transaction commits. pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current (sub) transaction.
+ *
+ * We discard duplicate notify events issued in the same transaction.
+ * Hence, in addition to the list proper (which we need to track the order
+ * of the events, since we guarantee to deliver them in order), we build a
+ * hash table which we can probe to detect duplicates. Since building the
+ * hash table is somewhat expensive, we do so only once we have at least
+ * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
+ * before that we just scan the events linearly.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
- * successful subtransactions attach their lists to their parent's list.
- * Failed subtransactions simply discard their lists.
+ * successful subtransactions add their entries to their parent's list.
+ * Failed subtransactions simply discard their lists. Since these lists
+ * are independent, there may be notify events in a subtransaction's list
+ * that duplicate events in some ancestor (sub) transaction; we get rid of
+ * the dups when merging the subtransaction's list into its parent's.
*
* Note: the action and notify lists do not interact within a transaction.
* In particular, if a transaction does NOTIFY and then LISTEN on the same
@@ -343,7 +355,20 @@ typedef struct Notification
char *payload; /* payload string (can be empty) */
} Notification;
-static List *pendingNotifies = NIL; /* list of Notifications */
+typedef struct NotificationList
+{
+ List *events; /* list of Notification structs */
+ HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
+} NotificationList;
+
+#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
+
+typedef struct NotificationHash
+{
+ Notification *event; /* => the actual Notification struct */
+} NotificationHash;
+
+static NotificationList *pendingNotifies = NULL; /* current list, if any */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
@@ -393,6 +418,9 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
+static void AddEventToPendingNotifies(Notification *n);
+static uint32 notification_hash(const void *key, Size keysize);
+static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
/*
@@ -586,11 +614,19 @@ Async_Notify(const char *channel, const char *payload)
else
n->payload = "";
- /*
- * We want to preserve the order so we need to append every notification.
- * See comments at AsyncExistsPendingNotify().
- */
- pendingNotifies = lappend(pendingNotifies, n);
+ if (pendingNotifies == NULL)
+ {
+ /* First notify event in current (sub)xact */
+ pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList));
+ pendingNotifies->events = list_make1(n);
+ /* We certainly don't need a hashtable yet */
+ pendingNotifies->hashtab = NULL;
+ }
+ else
+ {
+ /* Append more events to existing list */
+ AddEventToPendingNotifies(n);
+ }
MemoryContextSwitchTo(oldcontext);
}
@@ -761,7 +797,7 @@ PreCommit_Notify(void)
{
ListCell *p;
- if (pendingActions == NIL && pendingNotifies == NIL)
+ if (!pendingActions && !pendingNotifies)
return; /* no relevant statements in this xact */
if (Trace_notify)
@@ -821,7 +857,7 @@ PreCommit_Notify(void)
/* Now push the notifications into the queue */
backendHasSentNotifications = true;
- nextNotify = list_head(pendingNotifies);
+ nextNotify = list_head(pendingNotifies->events);
while (nextNotify != NULL)
{
/*
@@ -1294,7 +1330,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
* database OID in order to fill the page. So every page is always used up to
* the last byte which simplifies reading the page later.
*
- * We are passed the list cell (in pendingNotifies) containing the next
+ * We are passed the list cell (in pendingNotifies->events) containing the next
* notification to write and return the first still-unwritten cell back.
* Eventually we will return NULL indicating all is done.
*
@@ -1345,7 +1381,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
if (offset + qe.length <= QUEUE_PAGESIZE)
{
/* OK, so advance nextNotify past this item */
- nextNotify = lnext(pendingNotifies, nextNotify);
+ nextNotify = lnext(pendingNotifies->events, nextNotify);
}
else
{
@@ -1607,7 +1643,7 @@ AtSubStart_Notify(void)
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 1);
- pendingNotifies = NIL;
+ pendingNotifies = NULL;
MemoryContextSwitchTo(old_cxt);
}
@@ -1621,7 +1657,7 @@ void
AtSubCommit_Notify(void)
{
List *parentPendingActions;
- List *parentPendingNotifies;
+ NotificationList *parentPendingNotifies;
parentPendingActions = linitial_node(List, upperPendingActions);
upperPendingActions = list_delete_first(upperPendingActions);
@@ -1634,16 +1670,41 @@ AtSubCommit_Notify(void)
*/
pendingActions = list_concat(parentPendingActions, pendingActions);
- parentPendingNotifies = linitial_node(List, upperPendingNotifies);
+ parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 2);
- /*
- * We could try to eliminate duplicates here, but it seems not worthwhile.
- */
- pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+ if (pendingNotifies == NULL)
+ {
+ /* easy, no notify events happened in current subxact */
+ pendingNotifies = parentPendingNotifies;
+ }
+ else if (parentPendingNotifies == NULL)
+ {
+ /* easy, subxact's list becomes parent's */
+ }
+ else
+ {
+ /*
+ * Formerly, we didn't bother to eliminate duplicates here, but now we
+ * must, else we fall foul of "Assert(!found)", either here or during
+ * a later attempt to build the parent-level hashtable.
+ */
+ NotificationList *childPendingNotifies = pendingNotifies;
+ ListCell *l;
+
+ pendingNotifies = parentPendingNotifies;
+ /* Insert all the subxact's events into parent, except for dups */
+ foreach(l, childPendingNotifies->events)
+ {
+ Notification *childn = (Notification *) lfirst(l);
+
+ if (!AsyncExistsPendingNotify(childn->channel, childn->payload))
+ AddEventToPendingNotifies(childn);
+ }
+ }
}
/*
@@ -1672,7 +1733,7 @@ AtSubAbort_Notify(void)
while (list_length(upperPendingNotifies) > my_level - 2)
{
- pendingNotifies = linitial_node(List, upperPendingNotifies);
+ pendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
}
}
@@ -2102,50 +2163,149 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
static bool
AsyncExistsPendingNotify(const char *channel, const char *payload)
{
- ListCell *p;
- Notification *n;
-
- if (pendingNotifies == NIL)
+ if (pendingNotifies == NULL)
return false;
if (payload == NULL)
payload = "";
- /*----------
- * We need to append new elements to the end of the list in order to keep
- * the order. However, on the other hand we'd like to check the list
- * backwards in order to make duplicate-elimination a tad faster when the
- * same condition is signaled many times in a row. So as a compromise we
- * check the tail element first which we can access directly. If this
- * doesn't match, we check the whole list.
- *
- * As we are not checking our parents' lists, we can still get duplicates
- * in combination with subtransactions, like in:
- *
- * begin;
- * notify foo '1';
- * savepoint foo;
- * notify foo '1';
- * commit;
- *----------
- */
- n = (Notification *) llast(pendingNotifies);
- if (strcmp(n->channel, channel) == 0 &&
- strcmp(n->payload, payload) == 0)
- return true;
-
- foreach(p, pendingNotifies)
+ if (pendingNotifies->hashtab != NULL)
{
- n = (Notification *) lfirst(p);
-
- if (strcmp(n->channel, channel) == 0 &&
- strcmp(n->payload, payload) == 0)
+ /* Use the hash table to probe for a match */
+ Notification n;
+ Notification *k;
+
+ /* set up a dummy Notification struct */
+ n.channel = unconstify(char *, channel);
+ n.payload = unconstify(char *, payload);
+ k = &n;
+ /* ... and probe */
+ if (hash_search(pendingNotifies->hashtab,
+ &k,
+ HASH_FIND,
+ NULL))
return true;
}
+ else
+ {
+ /* Must scan the event list */
+ ListCell *l;
+
+ foreach(l, pendingNotifies->events)
+ {
+ Notification *n = (Notification *) lfirst(l);
+
+ if (strcmp(n->channel, channel) == 0 &&
+ strcmp(n->payload, payload) == 0)
+ return true;
+ }
+ }
return false;
}
+/*
+ * Add a notification event to a pre-existing pendingNotifies list.
+ *
+ * Because pendingNotifies->events is already nonempty, this works
+ * correctly no matter what CurrentMemoryContext is.
+ */
+static void
+AddEventToPendingNotifies(Notification *n)
+{
+ Assert(pendingNotifies->events != NIL);
+
+ /* Create the hash table if it's time to */
+ if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
+ pendingNotifies->hashtab == NULL)
+ {
+ HASHCTL hash_ctl;
+ ListCell *l;
+
+ /* Create the hash table */
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(Notification *);
+ hash_ctl.entrysize = sizeof(NotificationHash);
+ hash_ctl.hash = notification_hash;
+ hash_ctl.match = notification_match;
+ hash_ctl.hcxt = CurTransactionContext;
+ pendingNotifies->hashtab =
+ hash_create("Pending Notifies",
+ 256L,
+ &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
+
+ /* Insert all the already-existing events */
+ foreach(l, pendingNotifies->events)
+ {
+ Notification *oldn = (Notification *) lfirst(l);
+ NotificationHash *hentry;
+ bool found;
+
+ hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+ &oldn,
+ HASH_ENTER,
+ &found);
+ Assert(!found);
+ hentry->event = oldn;
+ }
+ }
+
+ /* Add new event to the list, in order */
+ pendingNotifies->events = lappend(pendingNotifies->events, n);
+
+ /* Add event to the hash table if needed */
+ if (pendingNotifies->hashtab != NULL)
+ {
+ NotificationHash *hentry;
+ bool found;
+
+ hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+ &n,
+ HASH_ENTER,
+ &found);
+ Assert(!found);
+ hentry->event = n;
+ }
+}
+
+/*
+ * notification_hash: hash function for notification hash table
+ *
+ * The hash "keys" are pointers to Notification structs.
+ */
+static uint32
+notification_hash(const void *key, Size keysize)
+{
+ const Notification *k = *(const Notification *const *) key;
+ uint32 hashc;
+ uint32 hashp;
+
+ Assert(keysize == sizeof(Notification *));
+ /* We just XOR the hashes for the two strings */
+ hashc = DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+ (int) strlen((const char *) k->channel)));
+ hashp = DatumGetUInt32(hash_any((const unsigned char *) k->payload,
+ (int) strlen((const char *) k->payload)));
+ return hashc ^ hashp;
+}
+
+/*
+ * notification_match: match function to use with notification_hash
+ */
+static int
+notification_match(const void *key1, const void *key2, Size keysize)
+{
+ const Notification *k1 = *(const Notification *const *) key1;
+ const Notification *k2 = *(const Notification *const *) key2;
+
+ Assert(keysize == sizeof(Notification *));
+ if (strcmp(k1->channel, k2->channel) == 0 &&
+ strcmp(k1->payload, k2->payload) == 0)
+ return 0; /* equal */
+ return 1; /* not equal */
+}
+
/* Clear the pendingActions and pendingNotifies lists. */
static void
ClearPendingActionsAndNotifies(void)
@@ -2158,5 +2318,5 @@ ClearPendingActionsAndNotifies(void)
* pointers.
*/
pendingActions = NIL;
- pendingNotifies = NIL;
+ pendingNotifies = NULL;
}
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 60ba506..7ad26b7 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -42,8 +42,6 @@ step notifys1:
notifier: NOTIFY "c1" with payload "payload" from notifier
notifier: NOTIFY "c2" with payload "payload" from notifier
-notifier: NOTIFY "c1" with payload "payload" from notifier
-notifier: NOTIFY "c2" with payload "payload" from notifier
notifier: NOTIFY "c1" with payload "payloads" from notifier
notifier: NOTIFY "c2" with payload "payloads" from notifier