From: Noah Misch Cosmetic code review of AdvanceXLInsertBuffer(). This mostly changes material new in commit bc22dc0e0ddc2dcb6043a732415019cc6b6bf683, but some of changes involve older code. ==== Changes that avoid work -- move to separate patch(es) Remove "upto = NewPageBeginPtr - 1;" so the "opportunistic" case doesn't wait. When the pg_atomic_compare_exchange_u64(&XLogCtl->InitializedEnd, ...) loop advanced NewPageBeginPtr, we'd forget that progress for the next iteration through the outer loop. Use NewPageBeginPtr to control both outer and inner loops, so we don't waste a compare-exchange in the outer loop. ==== Code cosmetics xlog.c has a standard that variables with names containing "End" refer to the first byte position lacking the property in question. Rename some variables already having such last+1 semantics to include word "End". Prefer "ptr1 >= ptr2" to "ptr1 + XLOG_BLCKSZ > ptr" where both work. Move variables to shorter-lived scope. Rename variables. Edit comments. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 47ffc0a..dd832d1 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -468,37 +468,45 @@ typedef struct XLogCtlData pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ /* - * First initialized page in the cache (first byte position). + * First byte of the first page containing post-recovery data. (If one + * page contains both pre-recovery and post-recovery data, it's that page. + * This still points to the page's first byte, even though that particular + * byte is pre-recovery.) Does not change after startup. Treat bytes + * below this as infeasible to write out to pg_wal. */ - XLogRecPtr InitializedFrom; + XLogRecPtr PreRecoveryPagesEnd; /* - * Latest reserved for initialization page in the cache (last byte - * position + 1). + * First byte that no ongoing AdvanceXLInsertBuffer() has reserved. * - * To change the identity of a buffer, you need to advance - * InitializeReserved first. To change the identity of a buffer that's - * still dirty, the old page needs to be written out first, and for that - * you need WALWriteLock, and you need to ensure that there are no - * in-progress insertions to the page by calling + * Every byte greater than or equal to this has not started + * initialization. Before changing the xlblocks for a buffer (the + * buffer's identity), you first need to advance InitializeReservedEnd to + * a value >= the buffer's next future xlblocks value. To change the + * identity of a buffer that's still dirty, the old page needs to be + * written out first, and for that you need WALWriteLock, and you need to + * ensure that there are no in-progress insertions to the page by calling * WaitXLogInsertionsToFinish(). */ - pg_atomic_uint64 InitializeReserved; + pg_atomic_uint64 InitializeReservedEnd; /* - * Latest initialized page in the cache (last byte position + 1). + * First byte in a possibly-uninitialized page. * - * InitializedUpTo is updated after the buffer initialization. After - * update, waiters got notification using InitializedUpToCondVar. + * Every byte less than this is in a page with both "xlblocks" and the + * "pages" bytes initialized. Bytes greater than or equal to this may or + * may be in pages lacking one of those initializations, since updating + * this is the last step of initialization. After update, waiters get + * notification from InitializedEndCondVar. */ - pg_atomic_uint64 InitializedUpTo; - ConditionVariable InitializedUpToCondVar; + pg_atomic_uint64 InitializedEnd; + ConditionVariable InitializedEndCondVar; /* * These values do not change after startup, although the pointed-to pages * and xlblocks values certainly do. xlblocks values are changed * lock-free according to the check for the xlog write position and are - * accompanied by changes of InitializeReserved and InitializedUpTo. + * accompanied by changes of InitializeReservedEnd and InitializedEnd. */ char *pages; /* buffers for unwritten XLOG pages */ pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ @@ -1996,40 +2004,44 @@ static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) { XLogCtlInsert *Insert = &XLogCtl->Insert; - int nextidx; - XLogRecPtr OldPageRqstPtr; - XLogwrtRqst WriteRqst; - XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; XLogRecPtr NewPageBeginPtr; - XLogPageHeader NewPage; - XLogRecPtr ReservedPtr; int npages pg_attribute_unused() = 0; /* - * We must run the loop below inside the critical section as we expect - * XLogCtl->InitializedUpTo to eventually keep up. The most of callers - * already run inside the critical section. Except for WAL writer, which - * passed 'opportunistic == true', and therefore we don't perform - * operations that could error out. - * - * Start an explicit critical section anyway though. + * Once we raise InitializeReservedEnd, we incur a duty to continue this + * initialization work until we've updated xlblocks. Non-opportunistic + * callers already run inside a critical section. Coincidentally, the + * opportunistic caller (walwriter) prefers no pg_wal I/O here, so we + * don't perform error-throwing operations for it. Non-walwriter callers + * accept PANIC on pg_wal I/O failure. */ Assert(CritSectionCount > 0 || opportunistic); START_CRIT_SECTION(); /*-- - * Loop till we get all the pages in WAL buffer before 'upto' reserved for - * initialization. Multiple process can initialize different buffers with - * this loop in parallel as following. + * For !opportunistic callers, loop till we initialize all the pages in + * WAL buffers before 'upto'. For "opportunistic" caller walwriter, we + * loop through all pages already written per the local LogwrtResult.Write + * cache. The idea is that walwriter writes out a batch of pages, then + * initializes each future page that will occupy a buffer walwriter wrote + * out. * - * 1. Reserve page for initialization using XLogCtl->InitializeReserved. + * Multiple process can initialize different buffers with this loop in + * parallel as following. + * + * 1. Reserve page for initialization using XLogCtl->InitializeReservedEnd. * 2. Initialize the reserved page. - * 3. Attempt to advance XLogCtl->InitializedUpTo, + * 3. Attempt to advance XLogCtl->InitializedEnd, */ - ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved); - while (upto >= ReservedPtr || opportunistic) + NewPageBeginPtr = pg_atomic_read_u64(&XLogCtl->InitializeReservedEnd); + while (upto >= NewPageBeginPtr || opportunistic) { - Assert(ReservedPtr % XLOG_BLCKSZ == 0); + int nextidx; + XLogRecPtr OldPageEndPtr; + XLogPageHeader NewPage; + XLogRecPtr NewPageEndPtr; + + Assert(NewPageBeginPtr % XLOG_BLCKSZ == 0); /* * Get ending-offset of the buffer page we need to replace. @@ -2038,18 +2050,28 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * must wait to be written. If it was written, xlblocks will have this * position (or uninitialized) */ - if (ReservedPtr + XLOG_BLCKSZ > XLogCtl->InitializedFrom + XLOG_BLCKSZ * XLOGbuffers) - OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - (XLogRecPtr) XLOG_BLCKSZ * XLOGbuffers; + if (NewPageBeginPtr >= XLogCtl->PreRecoveryPagesEnd + XLOG_BLCKSZ * XLOGbuffers) + { + XLogRecPtr OldPageBeginPtr = NewPageBeginPtr - (XLogRecPtr) XLOG_BLCKSZ * XLOGbuffers; + + OldPageEndPtr = OldPageBeginPtr + XLOG_BLCKSZ; + } else - OldPageRqstPtr = InvalidXLogRecPtr; + { + /* + * Total WAL writes since recovery have been less than + * wal_buffers. There's no old page to replace. + */ + OldPageEndPtr = InvalidXLogRecPtr; + } - if (LogwrtResult.Write < OldPageRqstPtr && opportunistic) + if (LogwrtResult.Write < OldPageEndPtr && opportunistic) { /* * If we just want to pre-initialize as much as we can without * flushing, give up now. */ - upto = ReservedPtr - 1; + Assert(upto == InvalidXLogRecPtr); break; } @@ -2057,29 +2079,29 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * Attempt to reserve the page for initialization. Failure means that * this page got reserved by another process. */ - if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved, - &ReservedPtr, - ReservedPtr + XLOG_BLCKSZ)) + if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReservedEnd, + &NewPageBeginPtr, + NewPageBeginPtr + XLOG_BLCKSZ)) continue; /* - * Wait till page gets correctly initialized up to OldPageRqstPtr. + * Wait till page gets correctly initialized up to OldPageEndPtr. */ - nextidx = XLogRecPtrToBufIdx(ReservedPtr); - while (pg_atomic_read_u64(&XLogCtl->InitializedUpTo) < OldPageRqstPtr) - ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT); + nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr); + while (pg_atomic_read_u64(&XLogCtl->InitializedEnd) < OldPageEndPtr) + ConditionVariableSleep(&XLogCtl->InitializedEndCondVar, WAIT_EVENT_WAL_BUFFER_INIT); ConditionVariableCancelSleep(); - Assert(pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) == OldPageRqstPtr); + Assert(pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) == OldPageEndPtr); /* Fall through if it's already written out. */ - if (LogwrtResult.Write < OldPageRqstPtr) + if (LogwrtResult.Write < OldPageEndPtr) { /* Nope, got work to do. */ /* Advance shared memory write request position */ SpinLockAcquire(&XLogCtl->info_lck); - if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) - XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; + if (XLogCtl->LogwrtRqst.Write < OldPageEndPtr) + XLogCtl->LogwrtRqst.Write = OldPageEndPtr; SpinLockRelease(&XLogCtl->info_lck); /* @@ -2087,23 +2109,25 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * need to write it or if someone else already did. */ RefreshXLogWriteResult(LogwrtResult); - if (LogwrtResult.Write < OldPageRqstPtr) + if (LogwrtResult.Write < OldPageEndPtr) { - WaitXLogInsertionsToFinish(OldPageRqstPtr); + WaitXLogInsertionsToFinish(OldPageEndPtr); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); RefreshXLogWriteResult(LogwrtResult); - if (LogwrtResult.Write >= OldPageRqstPtr) + if (LogwrtResult.Write >= OldPageEndPtr) { /* OK, someone wrote it already */ LWLockRelease(WALWriteLock); } else { + XLogwrtRqst WriteRqst; + /* Have to write it ourselves */ TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START(); - WriteRqst.Write = OldPageRqstPtr; + WriteRqst.Write = OldPageEndPtr; WriteRqst.Flush = 0; XLogWrite(WriteRqst, tli, false); LWLockRelease(WALWriteLock); @@ -2117,8 +2141,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * Now the next buffer slot is free and we can set it up to be the * next output page. */ - NewPageBeginPtr = ReservedPtr; - NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); @@ -2178,14 +2200,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) /* * Make sure the initialization of the page becomes visible to others - * before the xlblocks update. GetXLogBuffer() reads xlblocks without - * holding a lock. + * before the xlblocks update. */ pg_write_barrier(); /*----- * Update the value of XLogCtl->xlblocks[nextidx] and try to advance - * XLogCtl->InitializedUpTo in a lock-less manner. + * XLogCtl->InitializedEnd in a lock-less manner. * * First, let's provide a formal proof of the algorithm. Let it be 'n' * process with the following variables in shared memory: @@ -2228,21 +2249,22 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * where v gets stuck. Q.E.D. * * To apply this proof to the code below, we assume - * XLogCtl->InitializedUpTo will play the role of v with XLOG_BLCKSZ + * XLogCtl->InitializedEnd will play the role of v with XLOG_BLCKSZ * granularity. We also assume setting XLogCtl->xlblocks[nextidx] to * NewPageEndPtr to play the role of setting f[i] to true. Also, note * that processes can't concurrently map different xlog locations to * the same nextidx because we previously requested that - * XLogCtl->InitializedUpTo >= OldPageRqstPtr. So, a xlog buffer can + * XLogCtl->InitializedEnd >= OldPageEndPtr. So, a xlog buffer can * be taken for initialization only once the previous initialization - * takes effect on XLogCtl->InitializedUpTo. + * takes effect on XLogCtl->InitializedEnd. */ + NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr); pg_write_barrier(); - while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr)) + while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedEnd, &NewPageBeginPtr, NewPageEndPtr)) { NewPageBeginPtr = NewPageEndPtr; NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; @@ -2254,10 +2276,10 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) { /* * Page at nextidx wasn't initialized yet, so we can't move - * InitializedUpto further. It will be moved by backend which + * InitializedEnd further. It will be moved by backend which * will initialize nextidx. */ - ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar); + ConditionVariableBroadcast(&XLogCtl->InitializedEndCondVar); break; } } @@ -2272,8 +2294,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * initialization. However, some pages might be reserved by concurrent * processes. Wait till they finish initialization. */ - while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo)) - ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT); + while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedEnd)) + ConditionVariableSleep(&XLogCtl->InitializedEndCondVar, WAIT_EVENT_WAL_BUFFER_INIT); ConditionVariableCancelSleep(); pg_read_barrier(); @@ -5205,9 +5227,9 @@ XLOGShmemInit(void) pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); - pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr); - pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr); - ConditionVariableInit(&XLogCtl->InitializedUpToCondVar); + pg_atomic_init_u64(&XLogCtl->InitializeReservedEnd, InvalidXLogRecPtr); + pg_atomic_init_u64(&XLogCtl->InitializedEnd, InvalidXLogRecPtr); + ConditionVariableInit(&XLogCtl->InitializedEndCondVar); } /* @@ -6227,20 +6249,19 @@ StartupXLOG(void) memset(page + len, 0, XLOG_BLCKSZ - len); pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); - pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); - XLogCtl->InitializedFrom = endOfRecoveryInfo->lastPageBeginPtr; + pg_atomic_write_u64(&XLogCtl->InitializedEnd, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); + XLogCtl->PreRecoveryPagesEnd = endOfRecoveryInfo->lastPageBeginPtr; } else { /* - * There is no partial block to copy. Just set InitializedUpTo, and - * let the first attempt to insert a log record to initialize the next - * buffer. + * There is no partial block to copy. Just set InitializedEnd, and let + * the first log record initialize the next buffer. */ - pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog); - XLogCtl->InitializedFrom = EndOfLog; + pg_atomic_write_u64(&XLogCtl->InitializedEnd, EndOfLog); + XLogCtl->PreRecoveryPagesEnd = EndOfLog; } - pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo)); + pg_atomic_write_u64(&XLogCtl->InitializeReservedEnd, pg_atomic_read_u64(&XLogCtl->InitializedEnd)); /* * Update local and shared status. This is OK to do without any locks