diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 90a530cfc61..44442e927ff 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -264,6 +264,11 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is @@ -286,6 +291,7 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + QueuePosition advisoryPos; /* backend could skip queue to here */ bool wakeupPending; /* signal sent but not yet processed */ } QueueBackendStatus; @@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_ADVISORY_POS(i) (asyncQueueControl->backend[i].advisoryPos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) /* @@ -668,6 +675,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; } } @@ -2009,9 +2017,14 @@ SignalBackends(void) * Even though we may take and release NotifyQueueLock multiple times * while writing, the heavyweight lock guarantees this region contains * only our messages. Therefore, any backend still positioned at the - * queue head from before our write can be safely advanced to the current + * queue head from before our write can be advised to skip to the current * queue head without waking it. * + * We use the advisoryPos field rather than directly modifying pos, + * because the listening backend might be concurrently reading + * notifications using its local copy of pos. The backend controls its + * own pos field and will check advisoryPos when it's safe to do so. + * * False-positive possibility: if a backend was previously signaled but * hasn't yet awoken, we'll skip advancing it (because wakeupPending is * true). This is safe - the backend will advance its pointer when it @@ -2038,7 +2051,7 @@ SignalBackends(void) if (pendingNotifies != NULL && QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite)) { - QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; + QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite; pos = queueHeadAfterWrite; } @@ -2297,6 +2310,26 @@ asyncQueueReadAllNotifications(void) QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; + + /* + * Check if another backend has set an advisory position for us. + * If so, and if we haven't yet read past that point, we can safely + * adopt the advisory position and skip the intervening notifications. + * This is safe because the advisory position is only set when we're + * positioned at a known point and the skipped region contains only + * notifications we're not interested in. + */ + { + QueuePosition advisoryPos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber); + + if (!QUEUE_POS_EQUAL(advisoryPos, pos) && + QUEUE_POS_PRECEDES(pos, advisoryPos)) + { + pos = advisoryPos; + QUEUE_BACKEND_POS(MyProcNumber) = pos; + } + } + LWLockRelease(NotifyQueueLock); if (QUEUE_POS_EQUAL(pos, head))