diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 90a530cfc61..e201deb5e54 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -70,14 +70,14 @@ * CommitTransaction() which will then do the actual transaction commit. * * After commit we are called another time (AtCommit_Notify()). Here we - * make any actual updates to the local listen state (listenChannels) and - * shared channel hash table (channelHash). Then we signal any backends - * that may be interested in our messages (including our own backend, - * if listening). This is done by SignalBackends(), which consults the - * shared channel hash table to identify listeners for the channels that - * have pending notifications in the current database. Each selected - * backend is marked as having a wakeup pending to avoid duplicate signals, - * and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it. + * make any actual updates to the effective listen state (channelHash). + * Then we signal any backends that may be interested in our messages + * (including our own backend, if listening). This is done by + * SignalBackends(), which consults the shared channel hash table to + * identify listeners for the channels that have pending notifications + * in the current database. Each selected backend is marked as having a + * wakeup pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT + * signal is sent to it. * * When writing notifications, PreCommit_Notify() records the queue head * position both before and after the write. Because all writers serialize @@ -2282,6 +2282,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition head; Snapshot snapshot; + bool reachedStop; /* page_buffer must be adequately aligned, so use a union */ union @@ -2350,77 +2351,83 @@ asyncQueueReadAllNotifications(void) * It is possible that we fail while trying to send a message to our * frontend (for example, because of encoding conversion failure). If * that happens it is critical that we not try to send the same message - * over and over again. Therefore, we place a PG_TRY block here that will - * forcibly advance our queue position before we lose control to an error. - * (We could alternatively retake NotifyQueueLock and move the position - * before handling each individual message, but that seems like too much - * lock traffic.) + * over and over again. Therefore, we must advance our queue position + * regularly as we process messages. + * + * We must also be careful about concurrency: SignalBackends() can + * directly advance our position while we're reading. To prevent + * overwriting such an advancement with a stale value, we update our + * position in shared memory after processing messages from each page, + * while holding NotifyQueueLock. Shared lock is sufficient since we're + * only updating our own position. */ - PG_TRY(); + do { - bool reachedStop; + int64 curpage = QUEUE_POS_PAGE(pos); + int curoffset = QUEUE_POS_OFFSET(pos); + int slotno; + int copysize; - do + /* + * We copy the data from SLRU into a local buffer, so as to avoid + * holding the SLRU lock while we are examining the entries and + * possibly transmitting them to our frontend. Copy only the part + * of the page we will actually inspect. + */ + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + if (curpage == QUEUE_POS_PAGE(head)) { - int64 curpage = QUEUE_POS_PAGE(pos); - int curoffset = QUEUE_POS_OFFSET(pos); - int slotno; - int copysize; + /* we only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + memcpy(page_buffer.buf + curoffset, + NotifyCtl->shared->page_buffer[slotno] + curoffset, + copysize); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); - /* - * We copy the data from SLRU into a local buffer, so as to avoid - * holding the SLRU lock while we are examining the entries and - * possibly transmitting them to our frontend. Copy only the part - * of the page we will actually inspect. - */ - slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, - InvalidTransactionId); - if (curpage == QUEUE_POS_PAGE(head)) - { - /* we only want to read as far as head */ - copysize = QUEUE_POS_OFFSET(head) - curoffset; - if (copysize < 0) - copysize = 0; /* just for safety */ - } - else - { - /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; - } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, - copysize); - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + /* + * Process messages up to the stop position, end of page, or an + * uncommitted message. + * + * Our stop position is what we found to be the head's position + * when we entered this function. It might have changed already. + * But if it has, we will receive (or have already received and + * queued) another signal and come here again. + * + * We are not holding NotifyQueueLock here! The queue can only + * extend beyond the head pointer (see above). We update our + * backend's position after processing messages from each page to + * ensure we don't reprocess messages if we fail partway through, + * and to avoid overwriting any direct advancement that + * SignalBackends() might perform concurrently. + */ + reachedStop = asyncQueueProcessPageEntries(&pos, head, + page_buffer.buf, + snapshot); - /* - * Process messages up to the stop position, end of page, or an - * uncommitted message. - * - * Our stop position is what we found to be the head's position - * when we entered this function. It might have changed already. - * But if it has, we will receive (or have already received and - * queued) another signal and come here again. - * - * We are not holding NotifyQueueLock here! The queue can only - * extend beyond the head pointer (see above) and we leave our - * backend's pointer where it is so nobody will truncate or - * rewrite pages under us. Especially we don't want to hold a lock - * while sending the notifications to the frontend. - */ - reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf, - snapshot); - } while (!reachedStop); - } - PG_FINALLY(); - { - /* Update shared state */ + /* + * Update our position in shared memory. The 'pos' variable now + * holds our new position (advanced past all messages we just + * processed). This ensures that if we fail while processing + * messages from the next page, we won't reprocess the ones we + * just handled. It also prevents us from overwriting any direct + * advancement that another backend might have done while we were + * processing messages. + */ LWLockAcquire(NotifyQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyProcNumber) = pos; LWLockRelease(NotifyQueueLock); - } - PG_END_TRY(); + + } while (!reachedStop); /* Done with snapshot */ UnregisterSnapshot(snapshot);