*** a/src/backend/access/heap/heapam.c --- b/src/backend/access/heap/heapam.c *************** *** 5008,5021 **** heap2_desc(StringInfo buf, uint8 xl_info, char *rec) * to be done here.) */ void ! heap_sync(Relation rel) { /* temp tables never need fsync */ if (rel->rd_istemp) return; /* main heap */ ! FlushRelationBuffers(rel); /* FlushRelationBuffers will have opened rd_smgr */ smgrimmedsync(rel->rd_smgr, MAIN_FORKNUM); --- 5008,5021 ---- * to be done here.) */ void ! heap_sync(Relation rel, bool wasLogged) { /* temp tables never need fsync */ if (rel->rd_istemp) return; /* main heap */ ! FlushRelationBuffers(rel, wasLogged); /* FlushRelationBuffers will have opened rd_smgr */ smgrimmedsync(rel->rd_smgr, MAIN_FORKNUM); *************** *** 5027,5033 **** heap_sync(Relation rel) Relation toastrel; toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock); ! FlushRelationBuffers(toastrel); smgrimmedsync(toastrel->rd_smgr, MAIN_FORKNUM); heap_close(toastrel, AccessShareLock); } --- 5027,5033 ---- Relation toastrel; toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock); ! FlushRelationBuffers(toastrel, wasLogged); smgrimmedsync(toastrel->rd_smgr, MAIN_FORKNUM); heap_close(toastrel, AccessShareLock); } *** a/src/backend/access/heap/rewriteheap.c --- b/src/backend/access/heap/rewriteheap.c *************** *** 290,296 **** end_heap_rewrite(RewriteState state) * wrote before the checkpoint. */ if (!state->rs_new_rel->rd_istemp) ! heap_sync(state->rs_new_rel); /* Deleting the context frees everything */ MemoryContextDelete(state->rs_cxt); --- 290,296 ---- * wrote before the checkpoint. */ if (!state->rs_new_rel->rd_istemp) ! heap_sync(state->rs_new_rel, state->rs_use_wal); /* Deleting the context frees everything */ MemoryContextDelete(state->rs_cxt); *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 5980,5985 **** StartupXLOG(void) --- 5980,5991 ---- InRecovery = true; } + /* + * If double write file exists, see if there are any pages to + * be recovered because of torn writes. + */ + smgr_recoverdoublewritefile(); + /* REDO */ if (InRecovery) { *** a/src/backend/commands/copy.c --- b/src/backend/commands/copy.c *************** *** 2223,2229 **** CopyFrom(CopyState cstate) * indexes since those use WAL anyway) */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(cstate->rel); } --- 2223,2229 ---- * indexes since those use WAL anyway) */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(cstate->rel, false); } *** a/src/backend/commands/tablecmds.c --- b/src/backend/commands/tablecmds.c *************** *** 3294,3300 **** ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) /* If we skipped writing WAL, then we need to sync the heap. */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(newrel); heap_close(newrel, NoLock); } --- 3294,3300 ---- /* If we skipped writing WAL, then we need to sync the heap. */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(newrel, false); heap_close(newrel, NoLock); } *************** *** 7114,7120 **** ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) * in shared buffers. We assume no new changes will be made while we are * holding exclusive lock on the rel. */ ! FlushRelationBuffers(rel); /* * Relfilenodes are not unique across tablespaces, so we need to allocate --- 7114,7120 ---- * in shared buffers. We assume no new changes will be made while we are * holding exclusive lock on the rel. */ ! FlushRelationBuffers(rel, true); /* * Relfilenodes are not unique across tablespaces, so we need to allocate *************** *** 7213,7219 **** copy_relation_data(SMgrRelation src, SMgrRelation dst, /* If we got a cancel signal during the copy of the data, quit */ CHECK_FOR_INTERRUPTS(); ! smgrread(src, forkNum, blkno, buf); /* XLOG stuff */ if (use_wal) --- 7213,7219 ---- /* If we got a cancel signal during the copy of the data, quit */ CHECK_FOR_INTERRUPTS(); ! smgrread(src, forkNum, blkno, buf, NULL); /* XLOG stuff */ if (use_wal) *** a/src/backend/executor/execMain.c --- b/src/backend/executor/execMain.c *************** *** 2351,2357 **** CloseIntoRel(QueryDesc *queryDesc) /* If we skipped using WAL, must heap_sync before commit */ if (myState->hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(myState->rel); /* close rel, but keep lock until commit */ heap_close(myState->rel, NoLock); --- 2351,2357 ---- /* If we skipped using WAL, must heap_sync before commit */ if (myState->hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(myState->rel, false); /* close rel, but keep lock until commit */ heap_close(myState->rel, NoLock); *** a/src/backend/postmaster/postmaster.c --- b/src/backend/postmaster/postmaster.c *************** *** 314,319 **** extern int optind, --- 314,320 ---- extern int page_checksum; extern bool fullPageWrites; + extern int doubleWrites; #ifdef HAVE_INT_OPTRESET extern int optreset; /* might not be declared by system headers */ *************** *** 737,763 **** PostmasterMain(int argc, char *argv[]) if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\""))); ! ! /* ! * The idea here is that there will be checksum mismatches if there ! * are partial writes to pages during hardware crashes. So, the user ! * should have full_page_writes enabled if page_checksum is enabled, ! * so that these pages are automatically fixed, else Postgres may ! * often get checksum errors after crashes (on pages that are in fact ! * partially written and hence corrupted). With full_page_writes ! * enabled, Postgres will replace each page without ever looking at ! * the partially-written page and seeing an incorrect checksum. ! * Hence, checksums will detect only real disk corruptions (where the ! * disk reported a successful write but the data was still corrupted ! * at some point). ! * ! * Alternatively, we may want to leave this check out, for those ! * sophisticated users who know that their hardware/software setup ! * can never produce partial writes during crashes. ! */ ! if (page_checksum && !fullPageWrites) ereport(ERROR, ! (errmsg("full_page_writes must be enabled if page_checksum is enabled"))); /* * Other one-time internal sanity checks can go here, if they are fast. --- 738,747 ---- if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\""))); ! if (doubleWrites && !page_checksum) { ereport(ERROR, ! (errmsg("page_checksum must be enabled if double_writes is enabled"))); ! } /* * Other one-time internal sanity checks can go here, if they are fast. *** a/src/backend/storage/buffer/buf_init.c --- b/src/backend/storage/buffer/buf_init.c *************** *** 127,132 **** InitBufferPool(void) --- 127,133 ---- /* Init other shared buffer-management stuff */ StrategyInitialize(!foundDescs); + Bufmgr_DoubleWriteInit(); } /* *** a/src/backend/storage/buffer/bufmgr.c --- b/src/backend/storage/buffer/bufmgr.c *************** *** 41,46 **** --- 41,47 ---- #include "postmaster/bgwriter.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" + #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" #include "storage/smgr.h" *************** *** 60,65 **** --- 61,67 ---- /* Bits in SyncOneBuffer's return value */ #define BUF_WRITTEN 0x01 #define BUF_REUSABLE 0x02 + #define BUF_RETRY_AFTER_FLUSH 0x20 /* GUC variables */ *************** *** 74,81 **** double bgwriter_lru_multiplier = 2.0; */ int target_prefetch_pages = 0; /* local state for StartBufferIO and related functions */ ! static volatile BufferDesc *InProgressBuf = NULL; static bool IsForInput; /* local state for LockBufferForCleanup */ --- 76,86 ---- */ int target_prefetch_pages = 0; + extern bool doubleWrites; + /* local state for StartBufferIO and related functions */ ! static volatile BufferDesc *InProgressBuf[MAX_BATCHED_WRITES]; ! static int InProgressIndex = 0; static bool IsForInput; /* local state for LockBufferForCleanup */ *************** *** 90,96 **** static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy); --- 95,106 ---- static void PinBuffer_Locked(volatile BufferDesc *buf); static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner); static void BufferSync(int flags); + static int SyncOneBufferPrepare(int buf_id, bool skip_recently_used); static int SyncOneBuffer(int buf_id, bool skip_recently_used); + static int WriteOneBufferPrepare(int buf_id, char *callerBuf, + bool skip_recently_used); + static void WriteOneBufferDone(int buf_id, bool doUnpin); + static void WaitIO(volatile BufferDesc *buf); static bool StartBufferIO(volatile BufferDesc *buf, bool forInput); static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, *************** *** 102,107 **** static volatile BufferDesc *BufferAlloc(SMgrRelation smgr, ForkNumber forkNum, --- 112,122 ---- bool *foundPtr); static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln); static void AtProcExit_Buffers(int code, Datum arg); + static bool FlushBufferPrepare(int buf_id, volatile BufferDesc *buf, + char *callerBuf, + SMgrRelation reln); + static void CheckDoubleWriteFile(void); + static void FlushBatch(bool doUnpin); /* *************** *** 426,432 **** ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum, MemSet((char *) bufBlock, 0, BLCKSZ); else { ! smgrread(smgr, forkNum, blockNum, (char *) bufBlock); /* check for garbage data */ if (!PageHeaderIsValid((PageHeader) bufBlock)) --- 441,447 ---- MemSet((char *) bufBlock, 0, BLCKSZ); else { ! smgrread(smgr, forkNum, blockNum, (char *) bufBlock, NULL); /* check for garbage data */ if (!PageHeaderIsValid((PageHeader) bufBlock)) *************** *** 639,646 **** BufferAlloc(SMgrRelation smgr, ForkNumber forkNum, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode); ! FlushBuffer(buf, NULL); ! LWLockRelease(buf->content_lock); TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum, smgr->smgr_rnode.spcNode, --- 654,676 ---- smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode); ! if (doubleWrites) { ! CheckDoubleWriteFile(); ! /* ! * Make a one-element batch, so we can do the double ! * writes. We are serializing on the double write ! * file here. We could also have a small double ! * write file for each backend. ! */ ! if (FlushBufferPrepare(buf->buf_id, buf, NULL, NULL)) { ! FlushBatch(false); ! } else { ! LWLockRelease(buf->content_lock); ! } ! } else { ! FlushBuffer(buf, NULL); ! LWLockRelease(buf->content_lock); ! } TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum, smgr->smgr_rnode.spcNode, *************** *** 1146,1151 **** UnpinBuffer(volatile BufferDesc *buf, bool fixOwner) --- 1176,1343 ---- } } + extern bool page_checksum; + extern int batched_buffer_writes; + + /* File to which double writes are going */ + File doubleWriteFile = 0; + LWLockId doubleWriteLock; + /* Buffers in to which to do checksumming of batched buffers */ + char *origCksumBuf, *cksumBuf; + + /* Array of buffer writes that are being batched. */ + struct SMgrWriteList writeList[MAX_BATCHED_WRITES]; + int writeIndex; + + + + /* + * Unlock the buffers in the writeList[] (called after all IO is + * complete). Release them in reverse order, so InProgressBuf[] is + * managed efficiently. + */ + static void + UnlockBufs(bool doUnpin) + { + int j; + + for (j = writeIndex-1; j >= 0; j--) { + WriteOneBufferDone(writeList[j].buf_id, doUnpin); + } + writeIndex = 0; + } + + /* + * Initialize the lock on the double write file (run once in postmaster). + */ + void + Bufmgr_DoubleWriteInit() + { + doubleWriteLock = LWLockAssign(); + } + + /* + * Pre-allocate file with zeros in BLOCKSZ pages to at least len. + */ + static void + ZeroFile(FileName name, int len) + { + char *zbuffer; + int nbytes; + + int fd = BasicOpenFile(name, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR); + if (fd < 0) { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", name))); + } + zbuffer = (char *) palloc0(BLCKSZ); + for (nbytes = 0; nbytes < len; nbytes += BLCKSZ) { + errno = 0; + if ((int) write(fd, zbuffer, BLCKSZ) != (int) BLCKSZ) + { + int save_errno = errno; + + /* + * If we fail to make the file, delete it to release disk space + */ + unlink(name); + /* if write didn't set errno, assume problem is no disk space */ + errno = save_errno ? save_errno : ENOSPC; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", name))); + } + } + pfree(zbuffer); + + if (pg_fsync(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", name))); + + if (close(fd)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", name))); + } + + /* + * Allocate the buffers used for checksumming a batch of writes. + */ + static bool + AllocCksumBuf() + { + if (batched_buffer_writes > 0 && page_checksum && cksumBuf == NULL) { + origCksumBuf = (char *)palloc(batched_buffer_writes * BLCKSZ + ALIGNOF_BUFFER); + cksumBuf = (char *)TYPEALIGN(ALIGNOF_BUFFER, origCksumBuf); + return true; + } + return false; + } + + static void + FreeCksumBuf() + { + if (cksumBuf != NULL) { + pfree(origCksumBuf); + cksumBuf = origCksumBuf = NULL; + } + } + + /* Round up a to next multiple of b */ + #define ROUNDUP(a, b) ((((a)+(b)-1)/(b))*(b)) + + /* + * Open up the doubleWrite file and zero it out, if necessary. + */ + static void + CheckDoubleWriteFile() + { + if (doubleWrites) { + if (doubleWriteFile == 0) { + struct stat stat_buf; + char *name = smgr_doublewritefilename(); + int len = ROUNDUP(batched_buffer_writes * BLCKSZ + DOUBLE_WRITE_HEADER_SIZE, + BLCKSZ); + + /* Make sure that the file is completely allocated */ + LWLockAcquire(doubleWriteLock, LW_EXCLUSIVE); + if (stat(name, &stat_buf) == -1 || stat_buf.st_size < len) { + /* Fill out file if it is not big enough */ + ZeroFile(name, len); + } + LWLockRelease(doubleWriteLock); + + doubleWriteFile = PathNameOpenFile(name, + O_RDWR | O_CREAT, + S_IRUSR | S_IWUSR); + if (doubleWriteFile < 0) { + elog(PANIC, "Couldn't open double-write file %s", name); + } + pfree(name); + } + } + } + + /* + * Write out the current batch of buffers, doing a double write if + * required and releasing the buffer locks. + */ + static void + FlushBatch(bool doUnpin) + { + if (doubleWrites) { + LWLockAcquire(doubleWriteLock, LW_EXCLUSIVE); + } + smgrbwrite(writeIndex, writeList, doubleWriteFile, false); + if (doubleWrites) { + LWLockRelease(doubleWriteLock); + } + UnlockBufs(doUnpin); + } + /* * BufferSync -- Write out all dirty buffers in the pool. * *************** *** 1160,1165 **** BufferSync(int flags) --- 1352,1363 ---- int num_to_scan; int num_to_write; int num_written; + int64 targetTime, now, start; + int cur_num_written; + bool batched; + + AllocCksumBuf(); + CheckDoubleWriteFile(); /* Make sure we can handle the pin inside SyncOneBuffer */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); *************** *** 1205,1210 **** BufferSync(int flags) --- 1403,1411 ---- TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write); + if (log_checkpoints) { + elog(LOG, "checkpoint: %d buffers to write", num_to_write); + } /* * Loop over all buffers again, and write the ones (still) marked with * BM_CHECKPOINT_NEEDED. In this loop, we start at the clock sweep point *************** *** 1213,1218 **** BufferSync(int flags) --- 1414,1426 ---- * Note that we don't read the buffer alloc count here --- that should be * left untouched till the next BgBufferSync() call. */ + if (writeIndex != 0) { + elog(LOG, "Incomplete batch"); + } + start = GetCurrentTimestamp(); + targetTime = start + 30000000; + cur_num_written = 0; + batched = (batched_buffer_writes > 0); buf_id = StrategySyncStart(NULL, NULL); num_to_scan = NBuffers; num_written = 0; *************** *** 1234,1245 **** BufferSync(int flags) */ if (bufHdr->flags & BM_CHECKPOINT_NEEDED) { ! if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN) { TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); BgWriterStats.m_buf_written_checkpoints++; num_written++; /* * We know there are at most num_to_write buffers with * BM_CHECKPOINT_NEEDED set; so we can stop scanning if --- 1442,1489 ---- */ if (bufHdr->flags & BM_CHECKPOINT_NEEDED) { ! int r; ! if (!batched) { ! r = SyncOneBuffer(buf_id, false); ! } else { ! retry: ! r = WriteOneBufferPrepare(buf_id, ! cksumBuf + writeIndex * BLCKSZ, ! false); ! if (r & BUF_RETRY_AFTER_FLUSH) { ! if (writeIndex > 0) { ! elog(LOG, "Couldn't get context_lock, flushing"); ! FlushBatch(true); ! } else { ! elog(LOG, "Couldn't get context_lock, sleeping"); ! pg_usleep(10000); ! } ! goto retry; ! } ! if (0) elog(LOG, "async io %d", buf_id); ! ! } ! ! if (r & BUF_WRITTEN) { TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); BgWriterStats.m_buf_written_checkpoints++; num_written++; + if (!batched) { + /* + * Perform normal bgwriter duties and sleep to throttle our + * I/O rate. + */ + CheckpointWriteDelay(flags, + (double)num_written / num_to_write); + } + } + } + if (batched) { + if (writeIndex == batched_buffer_writes || + (writeIndex > 0 && num_to_scan == 0)) { + FlushBatch(true); /* * We know there are at most num_to_write buffers with * BM_CHECKPOINT_NEEDED set; so we can stop scanning if *************** *** 1262,1267 **** BufferSync(int flags) --- 1506,1523 ---- (double) num_written / num_to_write); } } + if (log_checkpoints && + ((now = GetCurrentTimestamp()) >= targetTime || + num_to_scan == 0)) { + /* Print out a checkpoint progress message */ + int progress = num_written * 100 / num_to_write; + int target = (now - start) * 100 / (int)(1000000 *CheckPointTimeout * CheckPointCompletionTarget); + elog(LOG, "%d written/s, %d%%, target %d%%", + (num_written - cur_num_written) / 30, + progress, target); + cur_num_written = num_written; + targetTime = now + 30000000; + } if (++buf_id >= NBuffers) buf_id = 0; *************** *** 1274,1279 **** BufferSync(int flags) --- 1530,1536 ---- CheckpointStats.ckpt_bufs_written += num_written; TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write); + FreeCksumBuf(); } /* *************** *** 1320,1325 **** BgBufferSync(void) --- 1577,1584 ---- int num_to_scan; int num_written; int reusable_buffers; + bool batched; + bool alloc; /* * Find out where the freelist clock sweep currently is, and how many *************** *** 1500,1509 **** BgBufferSync(void) num_written = 0; reusable_buffers = reusable_buffers_est; /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { ! int buffer_state = SyncOneBuffer(next_to_clean, true); if (++next_to_clean >= NBuffers) { --- 1759,1792 ---- num_written = 0; reusable_buffers = reusable_buffers_est; + alloc = AllocCksumBuf(); + CheckDoubleWriteFile(); + + batched = (batched_buffer_writes > 0); /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { ! int buffer_state; ! ! if (!batched) { ! buffer_state = SyncOneBuffer(next_to_clean, true); ! } else { ! retry: ! buffer_state = WriteOneBufferPrepare(next_to_clean, ! cksumBuf + writeIndex * BLCKSZ, ! false); ! if (buffer_state & BUF_RETRY_AFTER_FLUSH) { ! if (writeIndex > 0) { ! elog(LOG, "Couldn't get context_lock, flushing"); ! FlushBatch(true); ! } else { ! elog(LOG, "Couldn't get context_lock, sleeping"); ! pg_usleep(10000); ! } ! goto retry; ! } ! } ! if (++next_to_clean >= NBuffers) { *************** *** 1523,1528 **** BgBufferSync(void) --- 1806,1823 ---- } else if (buffer_state & BUF_REUSABLE) reusable_buffers++; + + /* If we've filled a batch, write it out */ + if (batched && writeIndex == batched_buffer_writes) { + FlushBatch(true); + } + } + /* If we have an incomplete batch, write it out. */ + if (batched && writeIndex > 0) { + FlushBatch(true); + } + if (alloc) { + FreeCksumBuf(); } BgWriterStats.m_buf_written_clean += num_written; *************** *** 1560,1582 **** BgBufferSync(void) } /* ! * SyncOneBuffer -- process a single buffer during syncing. ! * ! * If skip_recently_used is true, we don't write currently-pinned buffers, nor ! * buffers marked recently used, as these are not replacement candidates. ! * ! * Returns a bitmask containing the following flag bits: ! * BUF_WRITTEN: we wrote the buffer. ! * BUF_REUSABLE: buffer is available for replacement, ie, it has ! * pin count 0 and usage count 0. ! * ! * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean ! * after locking it, but we don't care all that much.) ! * ! * Note: caller must have done ResourceOwnerEnlargeBuffers. */ static int ! SyncOneBuffer(int buf_id, bool skip_recently_used) { volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id]; int result = 0; --- 1855,1864 ---- } /* ! * Prepare for syncing out one buffer. */ static int ! SyncOneBufferPrepare(int buf_id, bool skip_recently_used) { volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id]; int result = 0; *************** *** 1613,1619 **** SyncOneBuffer(int buf_id, bool skip_recently_used) * buffer is clean by the time we've locked it.) */ PinBuffer_Locked(bufHdr); ! LWLockAcquire(bufHdr->content_lock, LW_SHARED); FlushBuffer(bufHdr, NULL); --- 1895,1942 ---- * buffer is clean by the time we've locked it.) */ PinBuffer_Locked(bufHdr); ! ! /* ! * Since we are acquiring multiple buffers, we must do a ! * conditional-acquire. If we can't get the lock (very infrequent), ! * then we force a flush of the current batch of buffers, so we can ! * release our current set of locks before trying to get this one ! * again. ! */ ! if (!LWLockConditionalAcquire(bufHdr->content_lock, LW_SHARED)) { ! UnpinBuffer(bufHdr, true); ! return result | BUF_RETRY_AFTER_FLUSH; ! } ! return result | BUF_WRITTEN; ! } ! ! /* ! * SyncOneBuffer -- process a single buffer during syncing. ! * ! * If skip_recently_used is true, we don't write currently-pinned buffers, nor ! * buffers marked recently used, as these are not replacement candidates. ! * ! * Returns a bitmask containing the following flag bits: ! * BUF_WRITTEN: we wrote the buffer. ! * BUF_REUSABLE: buffer is available for replacement, ie, it has ! * pin count 0 and usage count 0. ! * BUF_CHECKPOINT: buffer has BM_CHECKPOINT_NEEDED set. ! * ! * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean ! * after locking it, but we don't care all that much.) ! * ! * Note: caller must have done ResourceOwnerEnlargeBuffers. ! */ ! static int ! SyncOneBuffer(int buf_id, bool skip_recently_used) ! { ! volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id]; ! int result; ! ! result = SyncOneBufferPrepare(buf_id, skip_recently_used); ! if ((result & BUF_WRITTEN) == 0) { ! return result; ! } FlushBuffer(bufHdr, NULL); *************** *** 1625,1630 **** SyncOneBuffer(int buf_id, bool skip_recently_used) --- 1948,1991 ---- /* + * Prepare for writing out one buffer. + */ + static int + WriteOneBufferPrepare(int buf_id, char *callerBuf, bool skip_recently_used) + { + volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id]; + int result; + + result = SyncOneBufferPrepare(buf_id, skip_recently_used); + if ((result & BUF_WRITTEN) == 0) { + return result; + } + if (!FlushBufferPrepare(buf_id, bufHdr, callerBuf, NULL)) { + LWLockRelease(bufHdr->content_lock); + UnpinBuffer(bufHdr, true); + elog(LOG, "Somebody else flushed..."); + result &= ~BUF_WRITTEN; + } + return result; + } + + /* + * Release locks after writing out a buffer. + */ + static void + WriteOneBufferDone(int buf_id, bool doUnpin) + { + volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id]; + + pgBufferUsage.shared_blks_written++; + TerminateBufferIO(bufHdr, true, 0); + LWLockRelease(bufHdr->content_lock); + if (doUnpin) { + UnpinBuffer(bufHdr, true); + } + } + + /* * AtEOXact_Buffers - clean up at end of transaction. * * As of PostgreSQL 8.0, buffer pins should get released by the *************** *** 1901,1906 **** FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln) --- 2262,2319 ---- } /* + * Prepare to flush out a buffer and record it on the writeList. + */ + static bool + FlushBufferPrepare(int buf_id, volatile BufferDesc *buf, char *callerBuf, + SMgrRelation reln) + { + XLogRecPtr recptr; + + Assert(LWLockHeldByMe(buf->content_lock)); + /* + * Acquire the buffer's io_in_progress lock. If StartBufferIO returns + * false, then someone else flushed the buffer before we could, so we need + * not do anything. + */ + if (!StartBufferIO(buf, false)) + return FALSE; + + /* Find smgr relation for buffer */ + if (reln == NULL) + reln = smgropen(buf->tag.rnode); + + /* + * Force XLOG flush up to buffer's LSN. This implements the basic WAL + * rule that log updates must hit disk before any of the data-file changes + * they describe do. + */ + recptr = BufferGetLSN(buf); + XLogFlush(recptr); + + /* + * Now it's safe to write buffer to disk. Note that no one else should + * have been able to write it while we were busy with log flushing because + * we have the io_in_progress lock. + */ + + /* To check if block content changes while flushing. - vadim 01/17/97 */ + LockBufHdr(buf); + buf->flags &= ~BM_JUST_DIRTIED; + UnlockBufHdr(buf); + + writeList[writeIndex].buf_id = buf_id; + writeList[writeIndex].reln = reln; + writeList[writeIndex].forkNum = buf->tag.forkNum; + writeList[writeIndex].blockNum = buf->tag.blockNum; + writeList[writeIndex].buffer = (char *)BufHdrGetBlock(buf); + writeList[writeIndex].callerBuf = callerBuf; + writeIndex++; + return TRUE; + } + + + /* * RelationGetNumberOfBlocks * Determines the current number of pages in the relation. */ *************** *** 2050,2055 **** PrintPinnedBufs(void) --- 2463,2504 ---- } #endif + /* + * Flush a buffer, doing it in a batch if doBatch is true. Requires that + * bufHdr is already locked. + */ + static void + FlushBufferBatched(volatile BufferDesc *bufHdr, SMgrRelation reln, bool doBatch) + { + PinBuffer_Locked(bufHdr); + if (doBatch) { + retry: + Assert(batched_buffer_writes > 0); + if (writeIndex == 0) { + LWLockAcquire(bufHdr->content_lock, LW_SHARED); + } else { + if (!LWLockConditionalAcquire(bufHdr->content_lock, + LW_SHARED)) { + FlushBatch(true); + goto retry; + } + } + if (!FlushBufferPrepare(bufHdr->buf_id, bufHdr, + cksumBuf + writeIndex * BLCKSZ, reln)) { + LWLockRelease(bufHdr->content_lock); + UnpinBuffer(bufHdr, true); + } + if (writeIndex == batched_buffer_writes) { + FlushBatch(true); + } + } else { + LWLockAcquire(bufHdr->content_lock, LW_SHARED); + FlushBuffer(bufHdr, reln); + LWLockRelease(bufHdr->content_lock); + UnpinBuffer(bufHdr, true); + } + } + /* --------------------------------------------------------------------- * FlushRelationBuffers * *************** *** 2070,2076 **** PrintPinnedBufs(void) * -------------------------------------------------------------------- */ void ! FlushRelationBuffers(Relation rel) { int i; volatile BufferDesc *bufHdr; --- 2519,2525 ---- * -------------------------------------------------------------------- */ void ! FlushRelationBuffers(Relation rel, bool wasLogged) { int i; volatile BufferDesc *bufHdr; *************** *** 2110,2115 **** FlushRelationBuffers(Relation rel) --- 2559,2566 ---- return; } + AllocCksumBuf(); + CheckDoubleWriteFile(); /* Make sure we can handle the pin inside the loop */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); *************** *** 2120,2134 **** FlushRelationBuffers(Relation rel) if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ! PinBuffer_Locked(bufHdr); ! LWLockAcquire(bufHdr->content_lock, LW_SHARED); ! FlushBuffer(bufHdr, rel->rd_smgr); ! LWLockRelease(bufHdr->content_lock); ! UnpinBuffer(bufHdr, true); } else UnlockBufHdr(bufHdr); } } /* --------------------------------------------------------------------- --- 2571,2591 ---- if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ! /* ! * Don't batch if the pages were not logged, since in ! * that case we don't want to do double writes, even if ! * the double write option is on. ! */ ! FlushBufferBatched(bufHdr, rel->rd_smgr, wasLogged && doubleWrites); } else UnlockBufHdr(bufHdr); } + if (wasLogged && doubleWrites && writeIndex > 0) { + /* Write out any incomplete batch */ + FlushBatch(true); + } + FreeCksumBuf(); } /* --------------------------------------------------------------------- *************** *** 2152,2157 **** FlushDatabaseBuffers(Oid dbid) --- 2609,2616 ---- int i; volatile BufferDesc *bufHdr; + AllocCksumBuf(); + CheckDoubleWriteFile(); /* Make sure we can handle the pin inside the loop */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); *************** *** 2162,2176 **** FlushDatabaseBuffers(Oid dbid) if (bufHdr->tag.rnode.dbNode == dbid && (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ! PinBuffer_Locked(bufHdr); ! LWLockAcquire(bufHdr->content_lock, LW_SHARED); ! FlushBuffer(bufHdr, NULL); ! LWLockRelease(bufHdr->content_lock); ! UnpinBuffer(bufHdr, true); } else UnlockBufHdr(bufHdr); } } /* --- 2621,2636 ---- if (bufHdr->tag.rnode.dbNode == dbid && (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ! FlushBufferBatched(bufHdr, NULL, doubleWrites); } else UnlockBufHdr(bufHdr); } + if (doubleWrites && writeIndex > 0) { + /* Write out any incomplete batch */ + FlushBatch(true); + } + FreeCksumBuf(); } /* *************** *** 2583,2590 **** WaitIO(volatile BufferDesc *buf) static bool StartBufferIO(volatile BufferDesc *buf, bool forInput) { - Assert(!InProgressBuf); - for (;;) { /* --- 3043,3048 ---- *************** *** 2623,2629 **** StartBufferIO(volatile BufferDesc *buf, bool forInput) UnlockBufHdr(buf); ! InProgressBuf = buf; IsForInput = forInput; return true; --- 3081,3087 ---- UnlockBufHdr(buf); ! InProgressBuf[InProgressIndex++] = buf; IsForInput = forInput; return true; *************** *** 2650,2656 **** static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, int set_flag_bits) { ! Assert(buf == InProgressBuf); LockBufHdr(buf); --- 3108,3115 ---- TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, int set_flag_bits) { ! /* BufferSync() will release batched buffers in reverse order */ ! Assert(InProgressBuf[InProgressIndex-1] == buf); LockBufHdr(buf); *************** *** 2662,2669 **** TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, UnlockBufHdr(buf); ! InProgressBuf = NULL; ! LWLockRelease(buf->io_in_progress_lock); } --- 3121,3128 ---- UnlockBufHdr(buf); ! InProgressIndex--; ! InProgressBuf[InProgressIndex] = NULL; LWLockRelease(buf->io_in_progress_lock); } *************** *** 2679,2688 **** TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, void AbortBufferIO(void) { ! volatile BufferDesc *buf = InProgressBuf; ! if (buf) ! { /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that --- 3138,3148 ---- void AbortBufferIO(void) { ! volatile BufferDesc *buf; ! int i; ! for (i = InProgressIndex - 1; i >= 0; i--) { ! buf = InProgressBuf[i]; /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that *************** *** 2723,2728 **** AbortBufferIO(void) --- 3183,3189 ---- } TerminateBufferIO(buf, false, BM_IO_ERROR); } + InProgressIndex = 0; } /* *** a/src/backend/storage/file/fd.c --- b/src/backend/storage/file/fd.c *************** *** 1260,1265 **** retry: --- 1260,1454 ---- return returnCode; } + struct io_iocb_common { + void *buf; + unsigned nbytes; + long long offset; + }; + + struct iocb { + void *data; + int aio_fildes; + union { + struct io_iocb_common c; + } u; + }; + + #if 0 + struct iovec { + void *iov_base; + size_t iov_len; + }; + #endif + + /* Array of iocbs used for io_submit */ + struct iocb ioc[MAX_BATCHED_WRITES]; + int iocIndex = 0; + + /* Array of iovecs use for iocbs and for writev. One extra entry for + * double-write header. */ + struct iovec iov[MAX_BATCHED_WRITES+1]; + int iovIndex = 0; + + static int + iovlen(struct iocb *ioc) + { + int i, len; + struct iovec *iov = (struct iovec *)ioc->u.c.buf; + + len = 0; + for (i = 0; i < ioc->u.c.nbytes; i++) { + len += iov->iov_len; + iov++; + } + return len; + } + + /* + * Flush existing batched IOs, doing a write to a temp area first if + * double_writes option is turned on. + */ + static int + FlushIOs(File doubleWriteFile) + { + int returnCode; + int i, j; + + if (doubleWriteFile > 0) { + returnCode = FileAccess(doubleWriteFile); + if (returnCode < 0) + return returnCode; + returnCode = lseek(VfdCache[doubleWriteFile].fd, 0, SEEK_SET); + if (returnCode < 0) { + elog(LOG, "lseek error errno %d, rc %d", + errno, returnCode); + abort(); + return returnCode; + } + /* Write out all the blocks sequentially in double-write file, + * starting with the double-write file header describing all the + * buffers. Double-write file should be open with O_DIRECT. */ + returnCode = writev(VfdCache[doubleWriteFile].fd, &iov[0], iovIndex); + if (returnCode < 0) { + elog(LOG, "writev to tmp, errno %d, rc %d, iovIndex %d", + errno, returnCode, iovIndex); + abort(); + return returnCode; + } + returnCode = fdatasync(VfdCache[doubleWriteFile].fd); + if (returnCode < 0) { + elog(LOG, "fdatasync error errno %d, rc %d", errno, + returnCode); + abort(); + return returnCode; + } + } + + for (i = 0; i < iocIndex; i++) { + returnCode = lseek(ioc[i].aio_fildes, ioc[i].u.c.offset, SEEK_SET); + if (returnCode < 0) { + elog(LOG, "lseek error errno %d, rc %d, offset %Ld", + errno, returnCode, ioc[i].u.c.offset); + abort(); + return returnCode; + } + /* Use writev to do one IO per consecutive blocks to the disk. */ + returnCode = writev(ioc[i].aio_fildes, ioc[i].u.c.buf, + ioc[i].u.c.nbytes); + if (returnCode < 0) { + elog(LOG, "writev error errno %d, rc %d", errno, returnCode); + abort(); + return returnCode; + } + } + if (doubleWriteFile > 0) { + /* If we are doing double writes, must make sure that the + * blocks was just wrote are forced to disk, so we can re-use + * double-write buffer next time. */ + for (i = 0; i < iocIndex; i++) { + for (j = 0; j < i; j++) { + if (ioc[j].aio_fildes == ioc[i].aio_fildes) { + break; + } + } + if (j == i) { + returnCode = fdatasync(ioc[i].aio_fildes); + if (returnCode < 0) { + elog(LOG, "fdatasync error errno %d, rc %d", errno, + returnCode); + abort(); + return returnCode; + } + } + } + } + return 0; + } + + #include "storage/smgr.h" + + /* + * Write out a list of memory regions to locations in files, as specified + * in writeList. If doubleWriteFile is >= 0, then also do double writes + * to the specified file (so full_page_writes can be avoided). + */ + int + FileBwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doubleBuf) + { + int returnCode; + int i; + + for (i = 0; i < writeLen; i++) { + struct SMgrWriteList *w = &(writeList[i]); + + Assert(FileIsValid(w->fd)); + DO_DB(elog(LOG, "FileBwrite: %d (%s) " INT64_FORMAT " %d %p", + file, VfdCache[w->fd].fileName, + (int64) VfdCache[w->fd].seekPos, + w->len, w->callerBuf)); + + returnCode = FileAccess(w->fd); + if (returnCode < 0) + return returnCode; + } + + iovIndex = 0; + iocIndex = 0; + + if (doubleWriteFile > 0) { + /* Write out the block directory in 4K chunk at the beginning */ + iov[iovIndex].iov_base = doubleBuf; + iov[iovIndex].iov_len = DOUBLE_WRITE_HEADER_SIZE; + iovIndex++; + } + + for (i = 0; i < writeLen; i++) { + struct SMgrWriteList *w = &(writeList[i]); + struct iocb *last; + + iov[iovIndex].iov_base = w->callerBuf; + iov[iovIndex].iov_len = w->len; + last = (iocIndex > 0) ? &(ioc[iocIndex - 1]) : NULL; + if (iocIndex > 0 && w->seekPos == last->u.c.offset + iovlen(last) && + last->aio_fildes == VfdCache[w->fd].fd) { + last->u.c.nbytes++; + } else { + ioc[iocIndex].aio_fildes = VfdCache[w->fd].fd; + ioc[iocIndex].u.c.buf = (void *)&iov[iovIndex]; + ioc[iocIndex].u.c.nbytes = 1; + ioc[iocIndex].u.c.offset = w->seekPos; + iocIndex++; + } + iovIndex++; + VfdCache[w->fd].seekPos = FileUnknownPos; + } + + returnCode = FlushIOs(doubleWriteFile); + + return returnCode; + } + int FileSync(File file) { *** a/src/backend/storage/lmgr/lwlock.c --- b/src/backend/storage/lmgr/lwlock.c *************** *** 80,87 **** NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; * during error recovery. The maximum size could be determined at runtime * if necessary, but it seems unlikely that more than a few locks could * ever be held simultaneously. */ ! #define MAX_SIMUL_LWLOCKS 100 static int num_held_lwlocks = 0; static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; --- 80,90 ---- * during error recovery. The maximum size could be determined at runtime * if necessary, but it seems unlikely that more than a few locks could * ever be held simultaneously. + * + * Must be at least 2 * MAX_BATCHED_WRITES, since bgwriter holds 2 + * lwlocks for each buffer that it is batching. */ ! #define MAX_SIMUL_LWLOCKS 150 static int num_held_lwlocks = 0; static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; *** a/src/backend/storage/smgr/md.c --- b/src/backend/storage/smgr/md.c *************** *** 727,732 **** mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, --- 727,774 ---- } /* + * Write out a list of buffers, as specified in writeList. If + * doubleWriteFile is >= 0, then also do double writes to the specified + * file (so full_page_writes can be avoided). + */ + void + mdbwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doubleBuf, bool isTemp) + { + off_t seekpos; + int nbytes; + MdfdVec *v; + File fd; + int i; + + for (i = 0; i < writeLen; i++) { + struct SMgrWriteList *w = &(writeList[i]); + + /* This assert is too expensive to have on normally ... */ + #ifdef CHECK_WRITE_VS_EXTEND + Assert(w->blockNum < mdnblocks(w->reln, w->forknum)); + #endif + v = _mdfd_getseg(w->reln, w->forkNum, w->blockNum, isTemp, EXTENSION_FAIL); + fd = v->mdfd_vfd; + + seekpos = (off_t) BLCKSZ *(w->blockNum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + w->fd = fd; + w->seekPos = seekpos; + w->len = BLCKSZ; + } + + nbytes = FileBwrite(writeLen, writeList, doubleWriteFile, doubleBuf); + if (nbytes < 0) { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("FileBwrite error %d", errno))); + } + } + + + /* * mdnblocks() -- Get the number of blocks stored in a relation. * * Important side effect: all active segments of the relation are opened *** a/src/backend/storage/smgr/smgr.c --- b/src/backend/storage/smgr/smgr.c *************** *** 55,60 **** typedef struct f_smgr --- 55,63 ---- BlockNumber blocknum, char *buffer); void (*smgr_write) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool isTemp); + void (*smgr_bwrite) (int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doubleBuf, + bool isTemp); BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum); void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks, bool isTemp); *************** *** 68,74 **** typedef struct f_smgr static const f_smgr smgrsw[] = { /* magnetic disk */ {mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend, ! mdprefetch, mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync, mdpreckpt, mdsync, mdpostckpt } }; --- 71,77 ---- static const f_smgr smgrsw[] = { /* magnetic disk */ {mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend, ! mdprefetch, mdread, mdwrite, mdbwrite, mdnblocks, mdtruncate, mdimmedsync, mdpreckpt, mdsync, mdpostckpt } }; *************** *** 92,97 **** static void smgrshutdown(int code, Datum arg); --- 95,102 ---- static void smgr_internal_unlink(RelFileNode rnode, ForkNumber forknum, int which, bool isTemp, bool isRedo); + /* Buffer used to write the double-write header */ + static char doubleBuf[DOUBLE_WRITE_HEADER_SIZE]; /* * smgrinit(), smgrshutdown() -- Initialize or shut down storage *************** *** 486,492 **** smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) */ void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ! char *buffer) { PageHeader p = (PageHeader) buffer; --- 491,497 ---- */ void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ! char *buffer, bool *cksumMismatch) { PageHeader p = (PageHeader) buffer; *************** *** 554,559 **** smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, --- 559,636 ---- } /* + * Write out a list of buffers, as specified in writeList. If + * doubleWriteFile is >= 0, then also do double writes to the specified + * file (so full_page_writes can be avoided). + */ + void + smgrbwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, bool isTemp) + { + PageHeader p = NULL; + int i; + + if (page_checksum) { + for (i = 0; i < writeLen; i++) { + struct SMgrWriteList *w = &(writeList[i]); + p = (PageHeader)w->callerBuf; + if (p == NULL) { + /* Use tempbuf in the 1-page case (BufferAlloc) */ + Assert(writeLen == 1); + p = (PageHeader) tempbuf; + w->callerBuf = (char *)tempbuf; + } + ((PageHeader)w->buffer)->cksum = 0; + p->cksum = CopyAndComputeChecksum((uint64 *)p, + (uint64 *)w->buffer, + BLCKSZ, + ChecksumInit(w->reln, + w->forkNum, + w->blockNum)); + Assert(PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION); + } + } else { + for (i = 0; i < writeLen; i++) { + p = (PageHeader)writeList[i].buffer;; + writeList[i].callerBuf = (char *)p; + p->cksum = INVALID_CKSUM; + Assert(PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION); + } + } + + if (doubleWriteFile > 0) { + /* + * Set up the initial double-write page that lists all the + * buffers that will be written to the double write file + * This list includes the checksums of all the buffers and is + * checksummed itself. + */ + struct DoubleBufHeader *hdr = (struct DoubleBufHeader *)doubleBuf; + struct DoubleBufItem *item = hdr->items; + for (i = 0; i < writeLen; i++) { + item->rnode = writeList[i].reln->smgr_rnode; + item->forkNum = writeList[i].forkNum; + item->blockNum = writeList[i].blockNum; + p = (PageHeader)writeList[i].callerBuf; + item->cksum = p->cksum; + item->pd_lsn = p->pd_lsn; + item++; + } + hdr->writeLen = writeLen; + hdr->cksum = ComputeChecksum((uint64 *)hdr, DOUBLE_WRITE_HEADER_SIZE, 0); + } + + (*(smgrsw[writeList[0].reln->smgr_which].smgr_bwrite))(writeLen, writeList, + doubleWriteFile, doubleBuf, + isTemp); + if (doubleWriteFile > 0) { + /* Zero out part of header that we filled in. */ + memset(doubleBuf, 0, + (char *)&(((struct DoubleBufHeader *) doubleBuf)->items[writeLen]) - doubleBuf); + } + } + + /* * smgrnblocks() -- Calculate the number of blocks in the * supplied relation. */ *************** *** 672,674 **** smgrpostckpt(void) --- 749,865 ---- (*(smgrsw[i].smgr_post_ckpt)) (); } } + + extern char *double_write_directory; + + /* + * Return the name of the double write file. Call must use pfree(). + */ + char * + smgr_doublewritefilename() + { + char *name; + if (double_write_directory != NULL) { + name = palloc(strlen(double_write_directory) + strlen("/double") + 1); + sprintf(name, "%s/double", double_write_directory); + } else { + name = pstrdup("base/double"); + } + return name; + } + + /* + * Called by postmaster at startup during recovery to read double-write + * file and see if there are any data blocks to be recovered. + */ + void + smgr_recoverdoublewritefile() + { + char *name; + struct stat stat_buf; + File fd; + int r; + struct DoubleBufHeader *hdr; + int i; + uint32 savedCksum; + + name = smgr_doublewritefilename(); + if (stat(name, &stat_buf) == -1) { + elog(LOG, "No double-write file"); + pfree(name); + return; + } + fd = PathNameOpenFile(name, O_RDONLY, S_IRUSR | S_IWUSR); + if (fd < 0) { + elog(PANIC, "Double-write file %s exists but can't be opened", name); + } + r = FileRead(fd, doubleBuf, DOUBLE_WRITE_HEADER_SIZE); + if (r < 0) { + goto badformat; + } + hdr = (struct DoubleBufHeader *)doubleBuf; + if (hdr->writeLen == 0 && hdr->cksum == 0) { + elog(LOG, "Double-write file %s is zeroed out", name); + FileClose(fd); + pfree(name); + return; + } + if (hdr->writeLen < 1 || hdr->writeLen > MAX_BATCHED_WRITES) { + goto badformat; + } + savedCksum = hdr->cksum; + hdr->cksum = 0; + if (savedCksum != + ComputeChecksum((uint64 *)hdr, DOUBLE_WRITE_HEADER_SIZE, 0)) { + goto badformat; + } + + for (i = 0; i < hdr->writeLen; i++) { + struct DoubleBufItem *it = &(hdr->items[i]); + SMgrRelation smgr; + bool mismatch; + PageHeader p; + + /* + * For each block described in double-write file header, see if + * the block in the database file has a checksum mismatch. If + * so, restore the block from the double-write file if that entry + * has correct checksum. + */ + smgr = smgropen(it->rnode); + mismatch = false; + smgrread(smgr, it->forkNum, it->blockNum, (char *)tempbuf, &mismatch); + if (mismatch) { + FileSeek(fd, DOUBLE_WRITE_HEADER_SIZE + i * BLCKSZ, SEEK_SET); + FileRead(fd, (char *)tempbuf, BLCKSZ); + p = (PageHeader)tempbuf; + savedCksum = p->cksum; + p->cksum = 0; + if (savedCksum != it->cksum || + savedCksum != ComputeChecksum((uint64 *)tempbuf, BLCKSZ, + ChecksumInit(smgr, it->forkNum, + it->blockNum))) { + elog(LOG, "Block %s/%d has checksum error, but can't be fixed, because slot %d of double-write file %s looks invalid", + relpath(it->rnode, it->forkNum), it->blockNum, i, name); + } else { + Assert(XLByteEQ(p->pd_lsn, it->pd_lsn)); + smgrwrite(smgr, it->forkNum, it->blockNum, (char *)tempbuf, false); + elog(LOG, "Fixed block %s/%d (which had checksum error) from double-write file %s slot %d", + relpath(it->rnode, it->forkNum), it->blockNum, name, i); + } + } else { + elog(DEBUG1, "Skipping slot %d of double-write file because block %s/%d is correct", + i, relpath(it->rnode, it->forkNum), it->blockNum); + } + smgrclose(smgr); + } + FileClose(fd); + pfree(name); + return; + + badformat: + elog(LOG, "Double-write file %s has bad format", name); + FileClose(fd); + pfree(name); + return; + } *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 116,121 **** extern int CommitSiblings; --- 116,123 ---- extern char *default_tablespace; extern char *temp_tablespaces; extern bool synchronize_seqscans; + bool doubleWrites; + char *double_write_directory; extern bool fullPageWrites; extern int ssl_renegotiation_limit; *************** *** 392,397 **** int tcp_keepalives_idle; --- 394,401 ---- int tcp_keepalives_interval; int tcp_keepalives_count; + int batched_buffer_writes = 0; + /* * These variables are all dummies that don't do anything, except in some * cases provide the value for SHOW to display. The real state is elsewhere *************** *** 767,772 **** static struct config_bool ConfigureNamesBool[] = --- 771,784 ---- true, NULL, NULL }, { + {"double_writes", PGC_POSTMASTER, DEVELOPER_OPTIONS, + gettext_noop("Page writes are written first to a temporary file before being written to the database file."), + NULL + }, + &doubleWrites, + false, NULL, NULL + }, + { {"silent_mode", PGC_POSTMASTER, LOGGING_WHERE, gettext_noop("Runs the server silently."), gettext_noop("If this parameter is set, the server will automatically run in the " *************** *** 2088,2093 **** static struct config_int ConfigureNamesInt[] = --- 2100,2115 ---- 1024, 100, 102400, NULL, NULL }, + { + {"batched_buffer_writes", PGC_POSTMASTER, DEVELOPER_OPTIONS, + gettext_noop("Bgwriter writes buffers in batches."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &batched_buffer_writes, + 0, 0, MAX_BATCHED_WRITES, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL *************** *** 2668,2673 **** static struct config_string ConfigureNamesString[] = --- 2690,2705 ---- "", assign_application_name, NULL }, + { + {"double_write_directory", PGC_POSTMASTER, DEVELOPER_OPTIONS, + gettext_noop("Location of the double write file."), + NULL, + GUC_REPORT | GUC_NOT_IN_SAMPLE + }, + &double_write_directory, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL *** a/src/backend/utils/resowner/resowner.c --- b/src/backend/utils/resowner/resowner.c *************** *** 515,521 **** ResourceOwnerEnlargeBuffers(ResourceOwner owner) if (owner->buffers == NULL) { ! newmax = 16; owner->buffers = (Buffer *) MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Buffer)); owner->maxbuffers = newmax; --- 515,523 ---- if (owner->buffers == NULL) { ! /* This should be at least MAX_BATCHED_WRITES, since bgwriter can ! * lock that many buffers at one time. */ ! newmax = 64; owner->buffers = (Buffer *) MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Buffer)); owner->maxbuffers = newmax; *** a/src/include/access/heapam.h --- b/src/include/access/heapam.h *************** *** 119,125 **** extern void simple_heap_update(Relation relation, ItemPointer otid, extern void heap_markpos(HeapScanDesc scan); extern void heap_restrpos(HeapScanDesc scan); ! extern void heap_sync(Relation relation); extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec); --- 119,125 ---- extern void heap_markpos(HeapScanDesc scan); extern void heap_restrpos(HeapScanDesc scan); ! extern void heap_sync(Relation relation, bool wasLogged); extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec); *** a/src/include/storage/bufmgr.h --- b/src/include/storage/bufmgr.h *************** *** 178,184 **** extern void PrintBufferLeakWarning(Buffer buffer); extern void CheckPointBuffers(int flags); extern BlockNumber BufferGetBlockNumber(Buffer buffer); extern BlockNumber RelationGetNumberOfBlocks(Relation relation); ! extern void FlushRelationBuffers(Relation rel); extern void FlushDatabaseBuffers(Oid dbid); extern void DropRelFileNodeBuffers(RelFileNode rnode, ForkNumber forkNum, bool istemp, BlockNumber firstDelBlock); --- 178,184 ---- extern void CheckPointBuffers(int flags); extern BlockNumber BufferGetBlockNumber(Buffer buffer); extern BlockNumber RelationGetNumberOfBlocks(Relation relation); ! extern void FlushRelationBuffers(Relation rel, bool wasLogged); extern void FlushDatabaseBuffers(Oid dbid); extern void DropRelFileNodeBuffers(RelFileNode rnode, ForkNumber forkNum, bool istemp, BlockNumber firstDelBlock); *************** *** 211,214 **** extern void AtProcExit_LocalBuffers(void); --- 211,216 ---- extern BufferAccessStrategy GetAccessStrategy(BufferAccessStrategyType btype); extern void FreeAccessStrategy(BufferAccessStrategy strategy); + extern void Bufmgr_DoubleWriteInit(void); + #endif *** a/src/include/storage/fd.h --- b/src/include/storage/fd.h *************** *** 53,63 **** typedef int File; --- 53,67 ---- /* GUC parameter */ extern int max_files_per_process; + #define MAX_BATCHED_WRITES 64 + #define DOUBLE_WRITE_HEADER_SIZE 4096 /* * prototypes for functions in fd.c */ + struct SMgrWriteList; + /* Operations on virtual Files --- equivalent to Unix kernel file ops */ extern File PathNameOpenFile(FileName fileName, int fileFlags, int fileMode); extern File OpenTemporaryFile(bool interXact); *************** *** 66,71 **** extern int FilePrefetch(File file, off_t offset, int amount); --- 70,77 ---- extern int FileRead(File file, char *buffer, int amount); extern int FileWrite(File file, char *buffer, int amount); extern int FileSync(File file); + extern int FileBwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doubleBuf); extern off_t FileSeek(File file, off_t offset, int whence); extern int FileTruncate(File file, off_t offset); extern char *FilePathName(File file); *** a/src/include/storage/smgr.h --- b/src/include/storage/smgr.h *************** *** 16,21 **** --- 16,22 ---- #include "access/xlog.h" #include "fmgr.h" + #include "fd.h" #include "storage/block.h" #include "storage/relfilenode.h" *************** *** 68,73 **** typedef struct SMgrRelationData --- 69,115 ---- typedef SMgrRelationData *SMgrRelation; + /* + * Element of an array specifying a buffer write in a list of buffer + * writes being batched. + */ + struct SMgrWriteList { + int buf_id; + SMgrRelation reln; + ForkNumber forkNum; + BlockNumber blockNum; + char *buffer; + char *callerBuf; + + /* Filled in by mdawrite */ + File fd; + off_t seekPos; + int len; + }; + + /* + * Description of one buffer in the double-write file, as listed in the + * header. + */ + struct DoubleBufItem { + /* Specification of where this buffer is in the database */ + RelFileNode rnode; /* physical relation identifier */ + ForkNumber forkNum; + BlockNumber blockNum; /* blknum relative to begin of reln */ + /* Checksum of the buffer. */ + int32 cksum; + /* LSN of the buffer */ + XLogRecPtr pd_lsn; + }; + + /* Format of the header of the double-write file */ + struct DoubleBufHeader { + uint32 cksum; + int32 writeLen; + uint32 pad1; + uint32 pad2; + struct DoubleBufItem items[0]; + }; extern void smgrinit(void); extern SMgrRelation smgropen(RelFileNode rnode); *************** *** 84,92 **** extern void smgrextend(SMgrRelation reln, ForkNumber forknum, extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); extern void smgrread(SMgrRelation reln, ForkNumber forknum, ! BlockNumber blocknum, char *buffer); extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool isTemp); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks, bool isTemp); --- 126,136 ---- extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); extern void smgrread(SMgrRelation reln, ForkNumber forknum, ! BlockNumber blocknum, char *buffer, bool *cksumMismatch); extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool isTemp); + extern void smgrbwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, bool isTemp); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks, bool isTemp); *************** *** 94,99 **** extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum); --- 138,145 ---- extern void smgrpreckpt(void); extern void smgrsync(void); extern void smgrpostckpt(void); + extern char *smgr_doublewritefilename(void); + extern void smgr_recoverdoublewritefile(void); /* internals: move me elsewhere -- ay 7/94 */ *************** *** 112,117 **** extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, --- 158,165 ---- char *buffer); extern void mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool isTemp); + extern void mdbwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doublebuf, bool isTemp); extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); extern void mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks, bool isTemp);