diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c index 0c274322bd..1494a35a5a 100644 --- a/contrib/tcn/tcn.c +++ b/contrib/tcn/tcn.c @@ -161,7 +161,7 @@ triggered_change_notification(PG_FUNCTION_ARGS) strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\''); } - Async_Notify(channel, payload->data); + Async_Notify(channel, payload->data, NOTIFY_COLLAPSE_MODE_MAYBE); } ReleaseSysCache(indexTuple); break; diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml index e0e125a2a2..96e0d7a990 100644 --- a/doc/src/sgml/ref/notify.sgml +++ b/doc/src/sgml/ref/notify.sgml @@ -21,7 +21,8 @@ PostgreSQL documentation -NOTIFY channel [ , payload ] +NOTIFY channel [ , payload [ , collapse_mode ] ] + @@ -93,20 +94,6 @@ NOTIFY channel [ , - - If the same channel name is signaled multiple times from the same - transaction with identical payload strings, the - database server can decide to deliver a single notification only. - On the other hand, notifications with distinct payload strings will - always be delivered as distinct notifications. Similarly, notifications from - different transactions will never get folded into one notification. - Except for dropping later instances of duplicate notifications, - NOTIFY guarantees that notifications from the same - transaction get delivered in the order they were sent. It is also - guaranteed that messages from different transactions are delivered in - the order in which the transactions committed. - - It is common for a client that executes NOTIFY to be listening on the same notification channel itself. In that case @@ -121,6 +108,41 @@ NOTIFY channel [ , + + + + + + Ordering and collapsing of notifications + + + If the same channel name is signaled multiple times from the same + transaction with identical payload strings, the + database server can decide to deliver a single notification only, + when the value of the collapse_mode parameter is + 'maybe' or '' (the empty string). + + If the 'never' collapse mode is specified, the server will + deliver all notifications, including duplicates. Turning off deduplication + in this way can considerably speed up transactions that emit large numbers + of notifications. + + Removal of duplicate notifications takes place within transaction block, + finished with COMMIT, END or SAVEPOINT. + + + + Notifications with distinct payload strings will + always be delivered as distinct notifications. Similarly, notifications from + different transactions will never get folded into one notification. + Except for dropping later instances of duplicate notifications, + NOTIFY guarantees that notifications from the same + transaction get delivered in the order they were sent. It is also + guaranteed that messages from different transactions are delivered in + the order in which the transactions committed. + + + @@ -147,6 +169,16 @@ NOTIFY channel [ , + + collapse_mode + + + The collapse mode to apply when identical notifications are issued within + a transaction. The acceptable values are 'maybe' (the + default) and 'never'. + + + @@ -190,6 +222,11 @@ NOTIFY channel [ , NOTIFY command if you need to work with non-constant channel names and payloads. + + There is a three-argument version, pg_notify(text, + text, text) where the third argument takes + the value of the collapse_mode parameter. + @@ -210,6 +247,21 @@ Asynchronous notification "virtual" with payload "This is the payload" received LISTEN foo; SELECT pg_notify('fo' || 'o', 'pay' || 'load'); Asynchronous notification "foo" with payload "payload" received from server process with PID 14728. + +/* Identical messages from same (sub-) transaction can be eliminated - unless you use the 'never' collapse mode */ +LISTEN bar; +BEGIN; +NOTIFY bar, 'Coffee please'; +NOTIFY bar, 'Coffee please'; +NOTIFY bar, 'Milk please'; +NOTIFY bar, 'Milk please', 'never'; +SAVEPOINT s; +NOTIFY bar, 'Coffee please'; +COMMIT; +Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517. +Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517. +Asynchronous notification "bar" with payload "Milk please" received from server process with PID 31517. +Asynchronous notification "bar" with payload "Coffee please" received from server process with PID 31517. diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index ee7c6d41b4..5bcde40cfd 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -15,99 +15,98 @@ /*------------------------------------------------------------------------- * Async Notification Model as of 9.0: * - * 1. Multiple backends on same machine. Multiple backends listening on - * several channels. (Channels are also called "conditions" in other - * parts of the code.) + * 1. Multiple backends on same machine. Multiple backends listening on several + * channels. (Channels are also called "conditions" in other parts of the + * code.) * * 2. There is one central queue in disk-based storage (directory pg_notify/), - * with actively-used pages mapped into shared memory by the slru.c module. - * All notification messages are placed in the queue and later read out - * by listening backends. + * with actively-used pages mapped into shared memory by the slru.c module. + * All notification messages are placed in the queue and later read out by + * listening backends. * - * There is no central knowledge of which backend listens on which channel; - * every backend has its own list of interesting channels. + * There is no central knowledge of which backend listens on which channel; + * every backend has its own list of interesting channels. * - * Although there is only one queue, notifications are treated as being - * database-local; this is done by including the sender's database OID - * in each notification message. Listening backends ignore messages - * that don't match their database OID. This is important because it - * ensures senders and receivers have the same database encoding and won't - * misinterpret non-ASCII text in the channel name or payload string. + * Although there is only one queue, notifications are treated as being + * database-local; this is done by including the sender's database OID in + * each notification message. Listening backends ignore messages that don't + * match their database OID. This is important because it ensures senders + * and receivers have the same database encoding and won't misinterpret + * non-ASCII text in the channel name or payload string. * - * Since notifications are not expected to survive database crashes, - * we can simply clean out the pg_notify data at any reboot, and there - * is no need for WAL support or fsync'ing. + * Since notifications are not expected to survive database crashes, we can + * simply clean out the pg_notify data at any reboot, and there is no need + * for WAL support or fsync'ing. * * 3. Every backend that is listening on at least one channel registers by - * entering its PID into the array in AsyncQueueControl. It then scans all - * incoming notifications in the central queue and first compares the - * database OID of the notification with its own database OID and then - * compares the notified channel with the list of channels that it listens - * to. In case there is a match it delivers the notification event to its - * frontend. Non-matching events are simply skipped. - * - * 4. The NOTIFY statement (routine Async_Notify) stores the notification in - * a backend-local list which will not be processed until transaction end. - * - * Duplicate notifications from the same transaction are sent out as one - * notification only. This is done to save work when for example a trigger - * on a 2 million row table fires a notification for each row that has been - * changed. If the application needs to receive every single notification - * that has been sent, it can easily add some unique string into the extra - * payload parameter. - * - * When the transaction is ready to commit, PreCommit_Notify() adds the - * pending notifications to the head of the queue. The head pointer of the - * queue always points to the next free position and a position is just a - * page number and the offset in that page. This is done before marking the - * transaction as committed in clog. If we run into problems writing the - * notifications, we can still call elog(ERROR, ...) and the transaction - * will roll back. - * - * Once we have put all of the notifications into the queue, we return to - * CommitTransaction() which will then do the actual transaction commit. - * - * After commit we are called another time (AtCommit_Notify()). Here we - * make the actual updates to the effective listen state (listenChannels). - * - * Finally, after we are out of the transaction altogether, we check if - * we need to signal listening backends. In SignalBackends() we scan the - * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal - * to every listening backend (we don't know which backend is listening on - * which channel so we must signal them all). We can exclude backends that - * are already up to date, though. We don't bother with a self-signal - * either, but just process the queue directly. - * - * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler - * sets the process's latch, which triggers the event to be processed - * immediately if this backend is idle (i.e., it is waiting for a frontend - * command and is not within a transaction block. C.f. - * ProcessClientReadInterrupt()). Otherwise the handler may only set a - * flag, which will cause the processing to occur just before we next go - * idle. - * - * Inbound-notify processing consists of reading all of the notifications - * that have arrived since scanning last time. We read every notification - * until we reach either a notification from an uncommitted transaction or - * the head pointer's position. Then we check if we were the laziest - * backend: if our pointer is set to the same position as the global tail - * pointer is set, then we move the global tail pointer ahead to where the - * second-laziest backend is (in general, we take the MIN of the current - * head position and all active backends' new tail pointers). Whenever we - * move the global tail pointer we also truncate now-unused pages (i.e., - * delete files in pg_notify/ that are no longer used). - * - * An application that listens on the same channel it notifies will get - * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, - * by comparing be_pid in the NOTIFY message to the application's own backend's + * entering its PID into the array in AsyncQueueControl. It then scans all + * incoming notifications in the central queue and first compares the + * database OID of the notification with its own database OID and then + * compares the notified channel with the list of channels that it listens + * to. In case there is a match it delivers the notification event to its + * frontend. Non-matching events are simply skipped. + * + * 4. The NOTIFY statement (routine Async_Notify) stores the notification in a + * backend-local list which will not be processed until transaction end. + * + * Duplicate notifications from the same transaction are, by default, sent + * out as one notification only. This is intended to save work when for + * example a trigger on a 2 million row table fires a notification for each + * row that has been changed. However, since the check for duplicates can be + * expensive, with O(n2) complexity, the collapse_mode argument allows + * turning this feature off. + * + * When the transaction is ready to commit, PreCommit_Notify() adds the + * pending notifications to the head of the queue. The head pointer of the + * queue always points to the next free position and a position is just a + * page number and the offset in that page. This is done before marking the + * transaction as committed in clog. If we run into problems writing the + * notifications, we can still call elog(ERROR, ...) and the transaction will + * roll back. + * + * Once we have put all of the notifications into the queue, we return to + * CommitTransaction() which will then do the actual transaction commit. + * + * After commit we are called another time (AtCommit_Notify()). Here we make + * the actual updates to the effective listen state (listenChannels). + * + * Finally, after we are out of the transaction altogether, we check if we + * need to signal listening backends. In SignalBackends() we scan the list + * of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal to every + * listening backend (we don't know which backend is listening on which + * channel so we must signal them all). We can exclude backends that are + * already up to date, though. We don't bother with a self-signal either, + * but just process the queue directly. + * + * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler sets + * the process's latch, which triggers the event to be processed immediately + * if this backend is idle (i.e., it is waiting for a frontend command and is + * not within a transaction block. C.f. ProcessClientReadInterrupt()). + * Otherwise the handler may only set a flag, which will cause the processing + * to occur just before we next go idle. + * + * Inbound-notify processing consists of reading all of the notifications + * that have arrived since scanning last time. We read every notification + * until we reach either a notification from an uncommitted transaction or + * the head pointer's position. Then we check if we were the laziest backend: + * if our pointer is set to the same position as the global tail pointer is + * set, then we move the global tail pointer ahead to where the + * second-laziest backend is (in general, we take the MIN of the current head + * position and all active backends' new tail pointers). Whenever we move the + * global tail pointer we also truncate now-unused pages (i.e., delete files + * in pg_notify/ that are no longer used). + * + * An application that listens on the same channel it notifies will get NOTIFY + * messages for its own NOTIFYs. These can be ignored, if not useful, by + * comparing be_pid in the NOTIFY message to the application's own backend's * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the * frontend during startup.) The above design guarantees that notifies from * other backends will never be missed by ignoring self-notifies. * * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS) - * can be varied without affecting anything but performance. The maximum - * amount of notification data that can be queued at one time is determined - * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below. + * can be varied without affecting anything but performance. The maximum amount + * of notification data that can be queued at one time is determined by slru.c's + * wraparound limit; see QUEUE_MAX_PAGE below. *------------------------------------------------------------------------- */ @@ -507,9 +506,21 @@ AsyncShmemInit(void) */ Datum pg_notify(PG_FUNCTION_ARGS) +{ + return pg_notify_3args(fcinfo); +} + + +/* + * pg_notify_3args + * SQL function to send a notification event, 3-argument version + */ +Datum +pg_notify_3args(PG_FUNCTION_ARGS) { const char *channel; const char *payload; + NotifyCollapseMode collapse_mode; if (PG_ARGISNULL(0)) channel = ""; @@ -521,15 +532,61 @@ pg_notify(PG_FUNCTION_ARGS) else payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); + if (PG_NARGS() < 3 || PG_ARGISNULL(2)) + { + collapse_mode = NOTIFY_COLLAPSE_MODE_MAYBE; + } + else + { + collapse_mode = str2collapse_mode(text_to_cstring(PG_GETARG_TEXT_PP(2))); + } + /* For NOTIFY as a statement, this is checked in ProcessUtility */ PreventCommandDuringRecovery("NOTIFY"); - Async_Notify(channel, payload); + Async_Notify(channel, payload, collapse_mode); PG_RETURN_VOID(); } +NotifyCollapseMode str2collapse_mode(const char *mode_str) +{ + NotifyCollapseMode collapse_mode; + if (!mode_str) + { + collapse_mode = NOTIFY_COLLAPSE_MODE_MAYBE; + } + else + { + if (strlen(mode_str) == 0) { + collapse_mode = NOTIFY_COLLAPSE_MODE_MAYBE; + } + else if (strcmp(mode_str, "always") == 0) + { + collapse_mode = NOTIFY_COLLAPSE_MODE_ALWAYS; + } + else if (strcmp(mode_str, "never") == 0) + { + collapse_mode = NOTIFY_COLLAPSE_MODE_NEVER; + } + else if (strcmp(mode_str, "maybe") == 0) + { + collapse_mode = NOTIFY_COLLAPSE_MODE_MAYBE; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid collapse_mode value '%s'", mode_str))); + } + } + + return collapse_mode; + +} + + /* * Async_Notify * @@ -540,10 +597,11 @@ pg_notify(PG_FUNCTION_ARGS) * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ */ void -Async_Notify(const char *channel, const char *payload) +Async_Notify(const char *channel, const char *payload, NotifyCollapseMode collapse_mode) { Notification *n; MemoryContext oldcontext; + bool removeDuplicates = false; if (IsParallelWorker()) elog(ERROR, "cannot send notifications from a parallel worker"); @@ -570,9 +628,17 @@ Async_Notify(const char *channel, const char *payload) errmsg("payload string too long"))); } - /* no point in making duplicate entries in the list ... */ - if (AsyncExistsPendingNotify(channel, payload)) - return; + if (collapse_mode == NOTIFY_COLLAPSE_MODE_ALWAYS || collapse_mode == NOTIFY_COLLAPSE_MODE_MAYBE) + { + removeDuplicates = true; + } + + if (removeDuplicates) + { + /* remove duplicate entries in the list */ + if (AsyncExistsPendingNotify(channel, payload)) + return; + } /* * The notification list needs to live until end of transaction, so store diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 02b500e5a0..3cda7773b5 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -521,7 +521,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type opt_varying opt_timezone opt_no_inherit %type Iconst SignedIconst -%type Sconst comment_text notify_payload +%type Sconst comment_text notify_payload notify_collapse_mode %type RoleId opt_boolean_or_string %type var_list %type ColId ColLabel var_name type_function_name param_name @@ -9809,18 +9809,32 @@ opt_instead: * *****************************************************************************/ -NotifyStmt: NOTIFY ColId notify_payload +NotifyStmt: + NOTIFY ColId + { + NotifyStmt *n = makeNode(NotifyStmt); + n->conditionname = $2; + n->payload = NULL; + n->collapse_mode = ""; + $$ = (Node *)n; + } + | NOTIFY ColId notify_payload notify_collapse_mode { NotifyStmt *n = makeNode(NotifyStmt); n->conditionname = $2; n->payload = $3; + n->collapse_mode = $4; $$ = (Node *)n; } ; notify_payload: ',' Sconst { $$ = $2; } - | /*EMPTY*/ { $$ = NULL; } + ; + +notify_collapse_mode: + ',' Sconst { $$ = $2; } + | /*EMPTY*/ { $$ = ""; } ; ListenStmt: LISTEN ColId diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index ad3a68a79b..2927b7de50 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -611,7 +611,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, NotifyStmt *stmt = (NotifyStmt *) parsetree; PreventCommandDuringRecovery("NOTIFY"); - Async_Notify(stmt->conditionname, stmt->payload); + Async_Notify(stmt->conditionname, stmt->payload, str2collapse_mode(stmt->collapse_mode)); } break; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index c4fc50dceb..e0fa28c08a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7670,6 +7670,11 @@ proname => 'pg_notify', proisstrict => 'f', provolatile => 'v', proparallel => 'r', prorettype => 'void', proargtypes => 'text text', prosrc => 'pg_notify' }, +{ oid => '3423', descr => 'send a notification event', + proname => 'pg_notify', proisstrict => 'f', provolatile => 'v', + proparallel => 'r', prorettype => 'void', proargtypes => 'text text text', + prosrc => 'pg_notify_3args' }, + { oid => '3296', descr => 'get the fraction of the asynchronous notification queue currently in use', proname => 'pg_notification_queue_usage', provolatile => 'v', diff --git a/src/include/commands/async.h b/src/include/commands/async.h index d5868c42a0..5f3f40af7c 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -32,8 +32,16 @@ extern void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid); +/* collapse mode argument to NOTIFY and pg_notify() */ +typedef enum NotifyCollapseMode { + NOTIFY_COLLAPSE_MODE_MAYBE, + NOTIFY_COLLAPSE_MODE_NEVER, + NOTIFY_COLLAPSE_MODE_ALWAYS +} NotifyCollapseMode; +extern NotifyCollapseMode str2collapse_mode(const char *mode_str); + /* notify-related SQL statements */ -extern void Async_Notify(const char *channel, const char *payload); +extern void Async_Notify(const char *channel, const char *payload, NotifyCollapseMode collapse_mode); extern void Async_Listen(const char *channel); extern void Async_Unlisten(const char *channel); extern void Async_UnlistenAll(void); @@ -54,4 +62,5 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(void); + #endif /* ASYNC_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index a49b0131cf..75840e67a5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2926,11 +2926,13 @@ typedef struct RuleStmt * Notify Statement * ---------------------- */ + typedef struct NotifyStmt { NodeTag type; char *conditionname; /* condition name to notify */ char *payload; /* the payload string, or NULL if none */ + char *collapse_mode; /* the collapse mode (empty string by default, which is equivalent to 'maybe') */ } NotifyStmt; /* ---------------------- diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out index 19cbe38e63..beff62b895 100644 --- a/src/test/regress/expected/async.out +++ b/src/test/regress/expected/async.out @@ -8,6 +8,18 @@ SELECT pg_notify('notify_async1','sample message1'); (1 row) +SELECT pg_notify('notify_async1','sample message1','maybe'); + pg_notify +----------- + +(1 row) + +SELECT pg_notify('notify_async1','sample_message1','never'); + pg_notify +----------- + +(1 row) + SELECT pg_notify('notify_async1',''); pg_notify ----------- @@ -29,9 +41,14 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________ ERROR: channel name too long --Should work. Valid NOTIFY/LISTEN/UNLISTEN commands NOTIFY notify_async2; +NOTIFY notify_async2, '', 'maybe'; +NOTIFY notify_async2, '', 'never'; LISTEN notify_async2; UNLISTEN notify_async2; UNLISTEN *; +--Should fail. Invalid collapse mode +NOTIFY notify_async2, '', 'foobar'; +ERROR: invalid collapse_mode value 'foobar' -- Should return zero while there are no pending notifications. -- src/test/isolation/specs/async-notify.spec tests for actual usage. SELECT pg_notification_queue_usage(); diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql index 40f6e01538..f95292e3e4 100644 --- a/src/test/regress/sql/async.sql +++ b/src/test/regress/sql/async.sql @@ -4,6 +4,8 @@ --Should work. Send a valid message via a valid channel name SELECT pg_notify('notify_async1','sample message1'); +SELECT pg_notify('notify_async1','sample message1','maybe'); +SELECT pg_notify('notify_async1','sample_message1','never'); SELECT pg_notify('notify_async1',''); SELECT pg_notify('notify_async1',NULL); @@ -14,10 +16,15 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________ --Should work. Valid NOTIFY/LISTEN/UNLISTEN commands NOTIFY notify_async2; +NOTIFY notify_async2, '', 'maybe'; +NOTIFY notify_async2, '', 'never'; LISTEN notify_async2; UNLISTEN notify_async2; UNLISTEN *; +--Should fail. Invalid collapse mode +NOTIFY notify_async2, '', 'foobar'; + -- Should return zero while there are no pending notifications. -- src/test/isolation/specs/async-notify.spec tests for actual usage. SELECT pg_notification_queue_usage();