From c403098ae4e4d06f109eb6292a67c6577e123010 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Sun, 19 Oct 2025 08:35:44 +0200 Subject: [PATCH] Implement idea #3 --- src/backend/commands/async.c | 150 ++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 65 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e6556fb8d1..b34e4a2247b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -264,6 +264,11 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is @@ -2304,6 +2309,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition head; Snapshot snapshot; + bool reachedStop; /* page_buffer must be adequately aligned, so use a union */ union @@ -2372,77 +2378,69 @@ asyncQueueReadAllNotifications(void) * It is possible that we fail while trying to send a message to our * frontend (for example, because of encoding conversion failure). If * that happens it is critical that we not try to send the same message - * over and over again. Therefore, we place a PG_TRY block here that will - * forcibly advance our queue position before we lose control to an error. - * (We could alternatively retake NotifyQueueLock and move the position - * before handling each individual message, but that seems like too much - * lock traffic.) + * over and over again. Therefore, we must advance our queue position + * regularly as we process messages. + * + * We must also be careful about concurrency: SignalBackends() can + * directly advance our position while we're reading. To preserve such + * advancement, asyncQueueProcessPageEntries updates our position in + * shared memory for each message, only writing if our position is ahead. + * Shared lock is sufficient since we're only updating our own position. */ - PG_TRY(); + do { - bool reachedStop; + int64 curpage = QUEUE_POS_PAGE(pos); + int curoffset = QUEUE_POS_OFFSET(pos); + int slotno; + int copysize; - do + /* + * We copy the data from SLRU into a local buffer, so as to avoid + * holding the SLRU lock while we are examining the entries and + * possibly transmitting them to our frontend. Copy only the part + * of the page we will actually inspect. + */ + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + if (curpage == QUEUE_POS_PAGE(head)) { - int64 curpage = QUEUE_POS_PAGE(pos); - int curoffset = QUEUE_POS_OFFSET(pos); - int slotno; - int copysize; + /* we only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + memcpy(page_buffer.buf + curoffset, + NotifyCtl->shared->page_buffer[slotno] + curoffset, + copysize); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); - /* - * We copy the data from SLRU into a local buffer, so as to avoid - * holding the SLRU lock while we are examining the entries and - * possibly transmitting them to our frontend. Copy only the part - * of the page we will actually inspect. - */ - slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, - InvalidTransactionId); - if (curpage == QUEUE_POS_PAGE(head)) - { - /* we only want to read as far as head */ - copysize = QUEUE_POS_OFFSET(head) - curoffset; - if (copysize < 0) - copysize = 0; /* just for safety */ - } - else - { - /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; - } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, - copysize); - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + /* + * Process messages up to the stop position, end of page, or an + * uncommitted message. + * + * Our stop position is what we found to be the head's position + * when we entered this function. It might have changed already. + * But if it has, we will receive (or have already received and + * queued) another signal and come here again. + * + * We are not holding NotifyQueueLock here! The queue can only + * extend beyond the head pointer (see above). + * asyncQueueProcessPageEntries will update our backend's position + * for each message to ensure we don't reprocess messages if we fail + * partway through, and to preserve any direct advancement that + * SignalBackends() might perform concurrently. + */ + reachedStop = asyncQueueProcessPageEntries(&pos, head, + page_buffer.buf, + snapshot); - /* - * Process messages up to the stop position, end of page, or an - * uncommitted message. - * - * Our stop position is what we found to be the head's position - * when we entered this function. It might have changed already. - * But if it has, we will receive (or have already received and - * queued) another signal and come here again. - * - * We are not holding NotifyQueueLock here! The queue can only - * extend beyond the head pointer (see above) and we leave our - * backend's pointer where it is so nobody will truncate or - * rewrite pages under us. Especially we don't want to hold a lock - * while sending the notifications to the frontend. - */ - reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf, - snapshot); - } while (!reachedStop); - } - PG_FINALLY(); - { - /* Update shared state */ - LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyProcNumber) = pos; - LWLockRelease(NotifyQueueLock); - } - PG_END_TRY(); + } while (!reachedStop); /* Done with snapshot */ UnregisterSnapshot(snapshot); @@ -2490,6 +2488,24 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, */ reachedEndOfPage = asyncQueueAdvance(current, qe->length); + /* + * Update our position in shared memory immediately after advancing, + * before we attempt to process the message. This ensures we won't + * reprocess this message if NotifyMyFrontEnd fails. + * + * Only write if our position is ahead of the shared position. + * If the shared position is already ahead (due to direct advancement + * by SignalBackends), preserve it by not overwriting. + */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + { + QueuePosition sharedPos = QUEUE_BACKEND_POS(MyProcNumber); + + if (QUEUE_POS_PRECEDES(sharedPos, *current)) + QUEUE_BACKEND_POS(MyProcNumber) = *current; + } + LWLockRelease(NotifyQueueLock); + /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { @@ -2515,6 +2531,10 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * messages. */ *current = thisentry; + /* Update shared memory to reflect the backed-up position */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + QUEUE_BACKEND_POS(MyProcNumber) = *current; + LWLockRelease(NotifyQueueLock); reachedStop = true; break; } -- 2.50.1