*** a/src/backend/access/heap/heapam.c --- b/src/backend/access/heap/heapam.c *************** *** 5722,5737 **** heap2_desc(StringInfo buf, uint8 xl_info, char *rec) * That behavior might change someday, but in any case it's likely that * any fsync decisions required would be per-index and hence not appropriate * to be done here.) */ void ! heap_sync(Relation rel) { /* non-WAL-logged tables never need fsync */ if (!RelationNeedsWAL(rel)) return; /* main heap */ ! FlushRelationBuffers(rel); /* FlushRelationBuffers will have opened rd_smgr */ smgrimmedsync(rel->rd_smgr, MAIN_FORKNUM); --- 5722,5741 ---- * That behavior might change someday, but in any case it's likely that * any fsync decisions required would be per-index and hence not appropriate * to be done here.) + * + * If needsDoubleWrite is true, the changed buffers should use + * double-writing if that option is enabled. Currently, relation changes + * that were not WAL-logged do not need double writes. */ void ! heap_sync(Relation rel, bool needsDoubleWrite) { /* non-WAL-logged tables never need fsync */ if (!RelationNeedsWAL(rel)) return; /* main heap */ ! FlushRelationBuffers(rel, needsDoubleWrite); /* FlushRelationBuffers will have opened rd_smgr */ smgrimmedsync(rel->rd_smgr, MAIN_FORKNUM); *************** *** 5743,5749 **** heap_sync(Relation rel) Relation toastrel; toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock); ! FlushRelationBuffers(toastrel); smgrimmedsync(toastrel->rd_smgr, MAIN_FORKNUM); heap_close(toastrel, AccessShareLock); } --- 5747,5753 ---- Relation toastrel; toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock); ! FlushRelationBuffers(toastrel, needsDoubleWrite); smgrimmedsync(toastrel->rd_smgr, MAIN_FORKNUM); heap_close(toastrel, AccessShareLock); } *** a/src/backend/access/heap/rewriteheap.c --- b/src/backend/access/heap/rewriteheap.c *************** *** 288,294 **** end_heap_rewrite(RewriteState state) * wrote before the checkpoint. */ if (RelationNeedsWAL(state->rs_new_rel)) ! heap_sync(state->rs_new_rel); /* Deleting the context frees everything */ MemoryContextDelete(state->rs_cxt); --- 288,294 ---- * wrote before the checkpoint. */ if (RelationNeedsWAL(state->rs_new_rel)) ! heap_sync(state->rs_new_rel, state->rs_use_wal); /* Deleting the context frees everything */ MemoryContextDelete(state->rs_cxt); *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 6256,6261 **** StartupXLOG(void) --- 6256,6269 ---- InRecovery = true; } + /* + * If double write file exists, see if there are any pages to be recovered + * because of torn writes. This must be done whether or not double_writes + * or full_page_writes is currently enabled, in order to recover any torn + * pages if double_writes was enabled during last crash. + */ + RecoverDoubleWriteFile(); + /* REDO */ if (InRecovery) { *** a/src/backend/commands/copy.c --- b/src/backend/commands/copy.c *************** *** 2138,2144 **** CopyFrom(CopyState cstate) * indexes since those use WAL anyway) */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(cstate->rel); return processed; } --- 2138,2144 ---- * indexes since those use WAL anyway) */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(cstate->rel, false); return processed; } *** a/src/backend/commands/tablecmds.c --- b/src/backend/commands/tablecmds.c *************** *** 3759,3765 **** ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) /* If we skipped writing WAL, then we need to sync the heap. */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(newrel); heap_close(newrel, NoLock); } --- 3759,3765 ---- /* If we skipped writing WAL, then we need to sync the heap. */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(newrel, false); heap_close(newrel, NoLock); } *************** *** 8384,8390 **** ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode) * in shared buffers. We assume no new changes will be made while we are * holding exclusive lock on the rel. */ ! FlushRelationBuffers(rel); /* * Relfilenodes are not unique across tablespaces, so we need to allocate --- 8384,8390 ---- * in shared buffers. We assume no new changes will be made while we are * holding exclusive lock on the rel. */ ! FlushRelationBuffers(rel, true); /* * Relfilenodes are not unique across tablespaces, so we need to allocate *************** *** 8486,8492 **** copy_relation_data(SMgrRelation src, SMgrRelation dst, /* If we got a cancel signal during the copy of the data, quit */ CHECK_FOR_INTERRUPTS(); ! smgrread(src, forkNum, blkno, buf); /* XLOG stuff */ if (use_wal) --- 8486,8492 ---- /* If we got a cancel signal during the copy of the data, quit */ CHECK_FOR_INTERRUPTS(); ! smgrread(src, forkNum, blkno, buf, NULL); /* XLOG stuff */ if (use_wal) *** a/src/backend/executor/execMain.c --- b/src/backend/executor/execMain.c *************** *** 2690,2696 **** CloseIntoRel(QueryDesc *queryDesc) /* If we skipped using WAL, must heap_sync before commit */ if (myState->hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(myState->rel); /* close rel, but keep lock until commit */ heap_close(myState->rel, NoLock); --- 2690,2696 ---- /* If we skipped using WAL, must heap_sync before commit */ if (myState->hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(myState->rel, false); /* close rel, but keep lock until commit */ heap_close(myState->rel, NoLock); *** a/src/backend/postmaster/postmaster.c --- b/src/backend/postmaster/postmaster.c *************** *** 314,319 **** extern char *optarg; --- 314,323 ---- extern int optind, opterr; + extern int page_checksum; + extern int doubleWrites; + extern int batched_buffer_writes; + #ifdef HAVE_INT_OPTRESET extern int optreset; /* might not be declared by system headers */ #endif *************** *** 765,770 **** PostmasterMain(int argc, char *argv[]) --- 769,786 ---- ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\""))); + /* The double-write option currently requires data page checksums. */ + if (doubleWrites && !page_checksum) + ereport(ERROR, + (errmsg("page_checksum must be enabled if double_writes is enabled"))); + + /* + * The double-write option requires batching of buffer writes, so force + * batched_buffer_writes to a default 32 if not already set. + */ + if (doubleWrites && batched_buffer_writes == 0) + batched_buffer_writes = 32; + /* * Other one-time internal sanity checks can go here, if they are fast. * (Put any slow processing further down, after postmaster.pid creation.) *** /dev/null --- b/src/backend/storage/buffer/README.doublewrites *************** *** 0 **** --- 1,193 ---- + DOUBLE WRITES + + The "double_write" GUC option enables the use of double write + functionality, which can be used as a replacement for full page writes. + The idea of this option is to handle the problem of torn writes for + buffer pages by writing (almost) all buffers twice, once to a + double-write file and once to the data file. The two writes are done in + a strictly sequential order, ensuring that the buffer is successfully + written to the double-write file before being written to the data file. + The "double_write" option (like full page writes) is only a crash + recovery feature -- it does not affect the database contents unless there + is a crash. + + The double-write file has checksummed contents, and in our + implementation, the pages in the data files are also be checksummed, + though this is not strictly required. If a crash occurs while buffer + writes (using double writes) are in progress, then a buffer page may have + a torn page in either the double-write file or in its data file. + However, because of the ordering of writes, there can only be a torn page + for that buffer in the double-write file or the data file, not both. + Therefore, during crash recovery, we can scan the double-write file. If + a page in the double-write file has a correct checksum, and the + corresponding page in the data file has an incorrect checksum, then we + can restore the torn data page from the double-write file. Any pages in + the double-write file that have incorrect checksum are ignored (since + they are likely torn pages). + + The net result is that "double_write" option fixes all torn data pages, + and can therefore be used in place of "full_page_writes". Using + double_writes typically improves performance, as compared to + "full_page_writes", and also greatly reduces the size of the WAL log. + + As currently written, the double_write option makes use of checksums on + the data pages. Double writes only strictly require that the pages in + the double-write file be checksummed, and we could fairly easily change + the implementation to make data checksums optional. However, if data + checksums are used, then Postgres can provide more useful messages on + exactly when torn pages have occurred. It is very likely that a torn + page happened if, during recovery, the checksum of a data page is + incorrect, but a copy of the page with a valid checksum is in the + double-write file. If there are no data checksums, then Postgres would + would still copy any valid page in the double-write file to the + appropriate page in a data file, but it cannot actually know if a torn + page occurred. + + + BATCHING WRITES FOR EFFICIENCY + + Our implementation achieves efficiency for double writes by having the + checkpointer and bgwriter batch writes of multiple pages together. + Currently, there is an option "batched_buffer_writes" that specifies how + many buffers to batch at a time. If double_writes is enabled, + batched_buffer_writes is forced to a default value of 32 if it is not + already set. We could choose to completely remove batched_buffer_writes + as an option. + + In order to batch multiple buffer writes without copying, the + checkpointer/bgwriter must acquire multiple buffer locks simultaneously + as it is building up the batch. Because it is acquiring multiple buffer + locks, there is the possibility of deadlock. We avoid the possibility of + deadlock by doing a conditional acquire whenever we are acquiring the + buffer content_lock for a batch. If the conditional acquire fails (and + we already have acquired some buffer locks), then we push out the current + batch immediately, so that we can release those buffer locks. We are + then guaranteed that we cannot deadlock as we again try to acquire the + lock. The situation of a failed conditional acquire almost never happens + in practice. The maximum number of batched buffers is set at + MAX_BATCHED_WRITES. The MAX_SIMUL_LWLOCKS limit in lwlock.c must be + increased to be at least 2 * MAX_BATCHED_WRITES, since the + checkpointer/bgwriter holds two lwlocks for each buffer that it is + batching. + + The actual batch writes are done using writev(), which might have to be + replaced with equivalent code, if this is a portability issue. A struct + iocb structure is currently used for bookkeeping during the low-level + batching, since it is compatible with an async IO approach as well (not + included). + + Given the batching functionality, double writes by the + checkpointer/bgwriter is implemented efficiently by writing a batch of + pages to the double-write file and fsyncing, and then writing the pages + to the appropriate data files, and then fsyncing all the necessary data + files. While the data fsyncing might be viewed as expensive, it does + help eliminate a lot of the fsync overhead at the end of checkpoints. + FlushRelationBuffers() and FlushDatabaseBuffers() can be similarly + batched. The maximum size of the double-write file is MAX_BATCHED_WRITES + * BLCKSZ. + + Note that direct IO (the O_DIRECT option in Linux) could be used for the + double-write file, if there were no portability concerns. In that case, + we write the whole set of buffers to the double-write file in a single + direct write, and no fsync in necessary. We would avoid copying + overhead, and any use of the buffer cache. We would in that case need to + make sure that the double-write file is fully-allocated ahead of time + (which is the only case when Linux actually guarantees that IO will be + direct to disk). Or we could do the fsync just to guarantee that that + the IO makes it to disk. + + The fsyncing of the data files is necessary to be sure that the + double-write file can be reused for the next batch of buffers. You must + be sure that the buffers have been successfully written to the data files + on disk before reusing the double-write file. Strictly speaking, the + data files only need to be fsync'ed by the time that the double-write + file must be used again. However, there is probably not much to be + gained by delaying the fsync unless multiple double-write files are used + (see below). + + We have some other code (not included) that sorts buffers to be + checkpointed in file/block order -- this can reduce fsync overhead + further by ensuring that each batch writes to only one or a few data + files, and potentially sequential or nearby locations in the data files. + + + DOUBLE-WRITE FILE + + When we write the double-write file, we include a header that identifies + each block in the double-write file and a checksum for each block (and is + checksummed itself). We use this header during recovery to determine the + contents of the double-write file and whether each block was fully + written (i.e. not torn). The length of the double-write header is set as + 4096 bytes, and must be large enough to hold information on + MAX_BATCHED_WRITES blocks. (See Assert() in smgrbwrite().) + + + WHEN TO DO DOUBLE WRITES + + Double writes must be done for any page which might be used after + recovery even if there was a full crash while writing the page. This + includes all writes to such pages in a checkpoint, not just the first + write. Pages in temporary tables and some unlogged operations do not + require double writes. This is controlled by flags in heap_sync() and + FlushRelationBuffers(). + + We do have to do the same double write for dirty buffer evictions by + individual backends. This could be expensive, if there are a lot of + dirty buffer evictions (i.e. where the checkpoint/bgwriter can generate + enough clean pages for the backends). + + There is currently just one double-write file, which is protected by a + global lock DoubleWriteLock. When the checkpointer, bgwriter, or a + backend wants to double-write a batch of buffers, it acquires + DoubleWriteLock, double-writes the batch, and releases the lock. In + order to avoid contention (when there are many dirty buffer evictions), + it might be useful to have one double write file for each backend and for + the checkpointer and bgwriter. In that case, the DoubleWriteLock would + no longer be needed, but all double-write files would have to be examined + during recovery. + + + PERFORMANCE + + We have seen significant performance gains for OLTP runs with sufficient + buffer cache size. In these runs, there are few dirty evictions, and the + checkpointer is doing almost all of the buffer writes. In that case, the + number of writes is not being greatly increased. We are doing each + buffer write twice, but we are eliminating the full page write that goes to + WAL log for each modification of a buffer in a checkpoint. Also, the + double-write file is written sequentially in a single write, so it is + highly efficient. Though we are doing many fsyncs, we are doing them in + the context of the checkpointer (and possibly in the bgwriter). + Meanwhile, we have removed the latency added to each transaction when the + backend must do a full page write to the WAL log for the first + modification of a buffer. + + OPTIONS + + The three options in this patch are double_writes, + double_write_directory, and batched_buffer_writes. + + double_writes controls whether double writes are used for buffer writes. + It requires that page_checksum is enabled, and that batched_buffer_writes + is non-zero. As mentioned above, we could just force + batched_buffer_writes to a default value (32) if double_writes is + enabled. Generally, the user would turn off full_page_writes is + double_writes is enabled. + + double_write_directory controls the directory where the double-write file + is located. If it is not set, then the double-write file is in the base + directory. This option could be used to put the double-write file on some + high-speed media (such as SSD). + + The batched_buffer_writes option controls whether buffer writes are + batched, and the maximum number of buffers that are batched at one time. + + + - Explain may be slower on ext2/ext3 because of all the fsyncs. (We + find it is OK, especially if logs and data are on separate ext3s.) + + TODO: + - fix resowner.c code + - Make fd.c code cleaer (don't use struct iocb) + - add back in ioseq code (sorting buffers before syncing) + - changes to remember if individual buffer doesn't need double-write? *** a/src/backend/storage/buffer/bufmgr.c --- b/src/backend/storage/buffer/bufmgr.c *************** *** 45,50 **** --- 45,51 ---- #include "storage/proc.h" #include "storage/smgr.h" #include "storage/standby.h" + #include "utils/memutils.h" #include "utils/rel.h" #include "utils/resowner.h" #include "utils/timestamp.h" *************** *** 75,82 **** double bgwriter_lru_multiplier = 2.0; */ int target_prefetch_pages = 0; /* local state for StartBufferIO and related functions */ ! static volatile BufferDesc *InProgressBuf = NULL; static bool IsForInput; /* local state for LockBufferForCleanup */ --- 76,87 ---- */ int target_prefetch_pages = 0; + extern bool doubleWrites; + /* local state for StartBufferIO and related functions */ ! /* Stack of buffers on which we have called StartBufferIO(). */ ! static volatile BufferDesc *InProgressBuf[MAX_BATCHED_WRITES]; ! static int InProgressIndex = 0; static bool IsForInput; /* local state for LockBufferForCleanup */ *************** *** 107,112 **** static volatile BufferDesc *BufferAlloc(SMgrRelation smgr, --- 112,125 ---- static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln); static void AtProcExit_Buffers(int code, Datum arg); + static void FlushBatch(bool doUnpin); + static int SyncOneBufferPrepare(int buf_id, bool skip_recently_used); + static int SyncOneBufferBatched(int buf_id, bool doBatch); + static bool FlushBufferPrepare(int buf_id, volatile BufferDesc *buf, + char *callerBuf, + SMgrRelation reln); + static void CheckDoubleWriteFile(void); + /* * PrefetchBuffer -- initiate asynchronous read of a block of a relation *************** *** 437,443 **** ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, MemSet((char *) bufBlock, 0, BLCKSZ); else { ! smgrread(smgr, forkNum, blockNum, (char *) bufBlock); /* check for garbage data */ if (!PageHeaderIsValid((PageHeader) bufBlock)) --- 450,456 ---- MemSet((char *) bufBlock, 0, BLCKSZ); else { ! smgrread(smgr, forkNum, blockNum, (char *) bufBlock, NULL); /* check for garbage data */ if (!PageHeaderIsValid((PageHeader) bufBlock)) *************** *** 651,658 **** BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, smgr->smgr_rnode.node.dbNode, smgr->smgr_rnode.node.relNode); ! FlushBuffer(buf, NULL); ! LWLockRelease(buf->content_lock); TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum, smgr->smgr_rnode.node.spcNode, --- 664,693 ---- smgr->smgr_rnode.node.dbNode, smgr->smgr_rnode.node.relNode); ! if (doubleWrites) ! { ! CheckDoubleWriteFile(); ! ! /* ! * Make a one-element batch, so we can do the double ! * writes. We are serializing on the double write file in ! * FlushBatch(). We could also have a small double write ! * file for each backend. ! */ ! if (FlushBufferPrepare(buf->buf_id, buf, NULL, NULL)) ! { ! FlushBatch(false); ! } ! else ! { ! LWLockRelease(buf->content_lock); ! } ! } ! else ! { ! FlushBuffer(buf, NULL); ! LWLockRelease(buf->content_lock); ! } TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum, smgr->smgr_rnode.node.spcNode, *************** *** 1165,1170 **** UnpinBuffer(volatile BufferDesc *buf, bool fixOwner) --- 1200,1310 ---- } } + extern bool page_checksum; + extern int batched_buffer_writes; + + /* File to which double writes are going */ + File doubleWriteFile = 0; + + /* + * Pointer to array of buffers in to which to do checksumming of + * batched buffers. + */ + char *origCksumBuf, + *cksumBuf; + + /* Array describing the buffer writes that are being batched. */ + struct SMgrWriteList writeList[MAX_BATCHED_WRITES]; + + /* Number of buffers in the current batch so far. */ + int writeIndex; + + + + /* + * Allocate the buffers used for checksumming a batch of writes. + */ + static void + AllocCksumBuf() + { + if (batched_buffer_writes > 0 && page_checksum && cksumBuf == NULL) + { + origCksumBuf = (char *) palloc(batched_buffer_writes * BLCKSZ + ALIGNOF_BUFFER); + cksumBuf = (char *) TYPEALIGN(ALIGNOF_BUFFER, origCksumBuf); + } + } + + /* + * Release the buffers used for checksumming a batch of writes. + */ + static void + FreeCksumBuf() + { + if (cksumBuf != NULL) + { + pfree(origCksumBuf); + cksumBuf = origCksumBuf = NULL; + } + } + + /* + * If double_writes is on, and the double-write file is not yet open in + * this process, open up the double-write file. + */ + static void + CheckDoubleWriteFile() + { + if (doubleWrites) + { + if (doubleWriteFile == 0) + { + char *name = DoubleWriteFileName(); + + doubleWriteFile = PathNameOpenFile(name, + O_RDWR | O_CREAT, + S_IRUSR | S_IWUSR); + if (doubleWriteFile < 0) + elog(PANIC, "Couldn't open double-write file %s", name); + pfree(name); + } + } + } + + /* + * Write out the current batch of buffers, doing a double write if + * required, and then release and unlock the buffers in the batch. + * Only unpin the buffers if doUnpin is true. + */ + static void + FlushBatch(bool doUnpin) + { + int j; + + if (doubleWrites) + LWLockAcquire(DoubleWriteLock, LW_EXCLUSIVE); + /* XXX should we set an error context, like FlushBuffer()? */ + smgrbwrite(writeIndex, writeList, doubleWriteFile); + if (doubleWrites) + LWLockRelease(DoubleWriteLock); + + + /* + * Release buffers in reverse order, so InProgressBuf[] is managed + * efficiently. + */ + for (j = writeIndex - 1; j >= 0; j--) + { + volatile BufferDesc *bufHdr = &BufferDescriptors[writeList[j].buf_id]; + + pgBufferUsage.shared_blks_written++; + TerminateBufferIO(bufHdr, true, 0); + LWLockRelease(bufHdr->content_lock); + if (doUnpin) + UnpinBuffer(bufHdr, true); + } + writeIndex = 0; + } + /* * BufferSync -- Write out all dirty buffers in the pool. * *************** *** 1183,1188 **** BufferSync(int flags) --- 1323,1338 ---- int num_written; int mask = BM_DIRTY; + #define DBLWRITE_DEBUG + #ifdef DBLWRITE_DEBUG + int64 targetTime, + now, + start; + int cur_num_written; + #endif + bool batched; + MemoryContext oldcontext; + /* Make sure we can handle the pin inside SyncOneBuffer */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); *************** *** 1234,1239 **** BufferSync(int flags) --- 1384,1406 ---- TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write); + #ifdef DBLWRITE_DEBUG + if (log_checkpoints) + elog(LOG, "checkpoint: %d buffers to write", num_to_write); + /* Initialize some debug code for watching progress of the checkpoint. */ + start = GetCurrentTimestamp(); + /* Print out message every 30 seconds */ + targetTime = start + 30000000; + cur_num_written = 0; + #endif + + Assert(writeIndex == 0); + batched = (batched_buffer_writes > 0); + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + AllocCksumBuf(); + MemoryContextSwitchTo(oldcontext); + CheckDoubleWriteFile(); + /* * Loop over all buffers again, and write the ones (still) marked with * BM_CHECKPOINT_NEEDED. In this loop, we start at the clock sweep point *************** *** 1263,1269 **** BufferSync(int flags) */ if (bufHdr->flags & BM_CHECKPOINT_NEEDED) { ! if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN) { TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); BgWriterStats.m_buf_written_checkpoints++; --- 1430,1440 ---- */ if (bufHdr->flags & BM_CHECKPOINT_NEEDED) { ! int r; ! ! r = SyncOneBufferBatched(buf_id, false); ! ! if (r & BUF_WRITTEN) { TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); BgWriterStats.m_buf_written_checkpoints++; *************** *** 1283,1298 **** BufferSync(int flags) if (num_written >= num_to_write) break; ! /* ! * Sleep to throttle our I/O rate. ! */ ! CheckpointWriteDelay(flags, (double) num_written / num_to_write); } } if (++buf_id >= NBuffers) buf_id = 0; } /* * Update checkpoint statistics. As noted above, this doesn't include --- 1454,1494 ---- if (num_written >= num_to_write) break; ! if (!batched || writeIndex == 0) ! { ! /* ! * Sleep to throttle our I/O rate. ! */ ! CheckpointWriteDelay(flags, ! (double) num_written / num_to_write); ! } } } + #ifdef DBLWRITE_DEBUG + if (log_checkpoints && + ((now = GetCurrentTimestamp()) >= targetTime || + num_to_scan == 0)) + { + /* Print out a checkpoint progress message */ + int progress = num_written * 100 / num_to_write; + int target = (now - start) * 100 / (int) (1000000 * CheckPointTimeout * CheckPointCompletionTarget); + + elog(LOG, "%d written/s, %d%%, target %d%%", + (num_written - cur_num_written) / 30, + progress, target); + cur_num_written = num_written; + targetTime = now + 30000000; + } + #endif + if (++buf_id >= NBuffers) buf_id = 0; } + if (writeIndex > 0) + { + FlushBatch(true); + } /* * Update checkpoint statistics. As noted above, this doesn't include *************** *** 1347,1352 **** BgBufferSync(void) --- 1543,1550 ---- int num_to_scan; int num_written; int reusable_buffers; + bool batched; + MemoryContext oldcontext; /* * Find out where the freelist clock sweep currently is, and how many *************** *** 1527,1536 **** BgBufferSync(void) num_written = 0; reusable_buffers = reusable_buffers_est; /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { ! int buffer_state = SyncOneBuffer(next_to_clean, true); if (++next_to_clean >= NBuffers) { --- 1725,1742 ---- num_written = 0; reusable_buffers = reusable_buffers_est; + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + AllocCksumBuf(); + MemoryContextSwitchTo(oldcontext); + CheckDoubleWriteFile(); + + batched = (batched_buffer_writes > 0); /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { ! int buffer_state; ! ! buffer_state = SyncOneBufferBatched(next_to_clean, batched); if (++next_to_clean >= NBuffers) { *************** *** 1551,1556 **** BgBufferSync(void) --- 1757,1765 ---- else if (buffer_state & BUF_REUSABLE) reusable_buffers++; } + /* If we have an incomplete batch, write it out. */ + if (batched && writeIndex > 0) + FlushBatch(true); BgWriterStats.m_buf_written_clean += num_written; *************** *** 1587,1609 **** BgBufferSync(void) } /* ! * SyncOneBuffer -- process a single buffer during syncing. ! * ! * If skip_recently_used is true, we don't write currently-pinned buffers, nor ! * buffers marked recently used, as these are not replacement candidates. ! * ! * Returns a bitmask containing the following flag bits: ! * BUF_WRITTEN: we wrote the buffer. ! * BUF_REUSABLE: buffer is available for replacement, ie, it has ! * pin count 0 and usage count 0. ! * ! * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean ! * after locking it, but we don't care all that much.) ! * ! * Note: caller must have done ResourceOwnerEnlargeBuffers. */ static int ! SyncOneBuffer(int buf_id, bool skip_recently_used) { volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id]; int result = 0; --- 1796,1805 ---- } /* ! * Prepare for syncing out one buffer. */ static int ! SyncOneBufferPrepare(int buf_id, bool skip_recently_used) { volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id]; int result = 0; *************** *** 1640,1649 **** SyncOneBuffer(int buf_id, bool skip_recently_used) * buffer is clean by the time we've locked it.) */ PinBuffer_Locked(bufHdr); LWLockAcquire(bufHdr->content_lock, LW_SHARED); - FlushBuffer(bufHdr, NULL); - LWLockRelease(bufHdr->content_lock); UnpinBuffer(bufHdr, true); --- 1836,1876 ---- * buffer is clean by the time we've locked it.) */ PinBuffer_Locked(bufHdr); + + return result | BUF_WRITTEN; + } + + /* + * SyncOneBuffer -- process a single buffer during syncing. + * + * If skip_recently_used is true, we don't write currently-pinned buffers, nor + * buffers marked recently used, as these are not replacement candidates. + * + * Returns a bitmask containing the following flag bits: + * BUF_WRITTEN: we wrote the buffer. + * BUF_REUSABLE: buffer is available for replacement, ie, it has + * pin count 0 and usage count 0. + * BUF_CHECKPOINT: buffer has BM_CHECKPOINT_NEEDED set. + * + * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean + * after locking it, but we don't care all that much.) + * + * Note: caller must have done ResourceOwnerEnlargeBuffers. + */ + static int + SyncOneBuffer(int buf_id, bool skip_recently_used) + { + volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id]; + int result; + + result = SyncOneBufferPrepare(buf_id, skip_recently_used); + if ((result & BUF_WRITTEN) == 0) + { + return result; + } + LWLockAcquire(bufHdr->content_lock, LW_SHARED); FlushBuffer(bufHdr, NULL); LWLockRelease(bufHdr->content_lock); UnpinBuffer(bufHdr, true); *************** *** 1652,1657 **** SyncOneBuffer(int buf_id, bool skip_recently_used) --- 1879,1943 ---- /* + * Sync a buffer, or prepare to sync a buffer in a batch if doBatch is + * true. Actually flush the batch of writes if we have reached now have + * batched_buffer_writes buffers in the batch. + */ + static int + SyncOneBufferBatched(int buf_id, bool doBatch) + { + int r; + volatile BufferDesc *bufHdr; + + if (!doBatch) + { + return SyncOneBuffer(buf_id, false); + } + r = SyncOneBufferPrepare(buf_id, false); + if (!(r & BUF_WRITTEN)) + { + return r; + } + bufHdr = &BufferDescriptors[buf_id]; + retry: + if (writeIndex == 0) + { + LWLockAcquire(bufHdr->content_lock, LW_SHARED); + } + else + { + /* + * If we are already holding some buffer content locks (because of + * batching multiple writes), then acquire the next content lock + * conditionally, so as to avoid deadlock. If we can't acquire it, + * then flush the current batch (which will reset writeIndex to zero + * as a consequence) and then retry (and now we won't have to acquire + * it conditionally). + */ + if (!LWLockConditionalAcquire(bufHdr->content_lock, LW_SHARED)) + { + FlushBatch(true); + goto retry; + } + } + if (!FlushBufferPrepare(bufHdr->buf_id, bufHdr, + cksumBuf + writeIndex * BLCKSZ, NULL)) + { + LWLockRelease(bufHdr->content_lock); + UnpinBuffer(bufHdr, true); + } + + /* + * If we now have a full batch, then flush it out the batch. + */ + if (writeIndex == batched_buffer_writes) + { + FlushBatch(true); + } + return r | BUF_WRITTEN; + } + + /* * AtEOXact_Buffers - clean up at end of transaction. * * As of PostgreSQL 8.0, buffer pins should get released by the *************** *** 1932,1937 **** FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln) --- 2218,2339 ---- } /* + * Prepare to flush out a buffer and record it as part of a batch (add it + * to writeList[]). + */ + static bool + FlushBufferPrepare(int buf_id, volatile BufferDesc *buf, char *callerBuf, + SMgrRelation reln) + { + XLogRecPtr recptr; + + Assert(LWLockHeldByMe(buf->content_lock)); + + /* + * Acquire the buffer's io_in_progress lock. If StartBufferIO returns + * false, then someone else flushed the buffer before we could, so we need + * not do anything. + */ + if (!StartBufferIO(buf, false)) + return FALSE; + + /* Find smgr relation for buffer */ + if (reln == NULL) + { + reln = smgropen(buf->tag.rnode, InvalidBackendId); + smgrsettransient(reln); + } + + /* + * Force XLOG flush up to buffer's LSN. This implements the basic WAL + * rule that log updates must hit disk before any of the data-file changes + * they describe do. + */ + recptr = BufferGetLSN(buf); + XLogFlush(recptr); + + /* + * Now it's safe to write buffer to disk. Note that no one else should + * have been able to write it while we were busy with log flushing because + * we have the io_in_progress lock. + */ + + /* To check if block content changes while flushing. - vadim 01/17/97 */ + LockBufHdr(buf); + buf->flags &= ~BM_JUST_DIRTIED; + UnlockBufHdr(buf); + + writeList[writeIndex].buf_id = buf_id; + writeList[writeIndex].reln = reln; + writeList[writeIndex].forkNum = buf->tag.forkNum; + writeList[writeIndex].blockNum = buf->tag.blockNum; + writeList[writeIndex].buffer = (char *) BufHdrGetBlock(buf); + writeList[writeIndex].callerBuf = callerBuf; + writeIndex++; + return TRUE; + } + + /* + * Flush a buffer, or prepare to flush a buffer in a batch if doBatch is + * true. Requires that bufHdr is already locked. Actually flush the + * batch of writes if we have reached now have batched_buffer_writes + * buffers in the batch. + */ + static void + FlushBufferBatched(volatile BufferDesc *bufHdr, SMgrRelation reln, bool doBatch) + { + PinBuffer_Locked(bufHdr); + if (doBatch) + { + retry: + Assert(batched_buffer_writes > 0); + if (writeIndex == 0) + { + LWLockAcquire(bufHdr->content_lock, LW_SHARED); + } + else + { + /* + * If we are already holding some buffer content locks (because of + * batching multiple writes), then acquire the next content lock + * conditionally, so as to avoid deadlock. If we can't acquire + * it, then flush the current batch (which will reset writeIndex + * to zero as a consequence) and then retry (and now we won't have + * to acquire it conditionally). + */ + if (!LWLockConditionalAcquire(bufHdr->content_lock, + LW_SHARED)) + { + FlushBatch(true); + goto retry; + } + } + if (!FlushBufferPrepare(bufHdr->buf_id, bufHdr, + cksumBuf + writeIndex * BLCKSZ, reln)) + { + LWLockRelease(bufHdr->content_lock); + UnpinBuffer(bufHdr, true); + } + + /* + * If we now have a full batch, then flush it out the batch. + */ + if (writeIndex == batched_buffer_writes) + { + FlushBatch(true); + } + } + else + { + LWLockAcquire(bufHdr->content_lock, LW_SHARED); + FlushBuffer(bufHdr, reln); + LWLockRelease(bufHdr->content_lock); + UnpinBuffer(bufHdr, true); + } + } + + + /* * RelationGetNumberOfBlocks * Determines the current number of pages in the relation. */ *************** *** 2128,2137 **** PrintPinnedBufs(void) * used in any performance-critical code paths, so it's not worth * adding additional overhead to normal paths to make it go faster; * but see also DropRelFileNodeBuffers. * -------------------------------------------------------------------- */ void ! FlushRelationBuffers(Relation rel) { int i; volatile BufferDesc *bufHdr; --- 2530,2543 ---- * used in any performance-critical code paths, so it's not worth * adding additional overhead to normal paths to make it go faster; * but see also DropRelFileNodeBuffers. + * + * If needsDoubleWrite is true, then the changed buffers should use + * double-writing, if that option is enabled. Currently, relation + * changes that were not WAL-logged do not need double writes. * -------------------------------------------------------------------- */ void ! FlushRelationBuffers(Relation rel, bool needsDoubleWrite) { int i; volatile BufferDesc *bufHdr; *************** *** 2171,2176 **** FlushRelationBuffers(Relation rel) --- 2577,2584 ---- return; } + AllocCksumBuf(); + CheckDoubleWriteFile(); /* Make sure we can handle the pin inside the loop */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); *************** *** 2181,2195 **** FlushRelationBuffers(Relation rel) if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ! PinBuffer_Locked(bufHdr); ! LWLockAcquire(bufHdr->content_lock, LW_SHARED); ! FlushBuffer(bufHdr, rel->rd_smgr); ! LWLockRelease(bufHdr->content_lock); ! UnpinBuffer(bufHdr, true); } else UnlockBufHdr(bufHdr); } } /* --------------------------------------------------------------------- --- 2589,2611 ---- if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ! /* ! * Don't batch if the pages were not logged, since in that case we ! * don't want to do double writes, even if the double write option ! * is on. ! */ ! FlushBufferBatched(bufHdr, rel->rd_smgr, ! needsDoubleWrite && doubleWrites); } else UnlockBufHdr(bufHdr); } + if (needsDoubleWrite && doubleWrites && writeIndex > 0) + { + /* Write out any incomplete batch */ + FlushBatch(true); + } + FreeCksumBuf(); } /* --------------------------------------------------------------------- *************** *** 2213,2218 **** FlushDatabaseBuffers(Oid dbid) --- 2629,2636 ---- int i; volatile BufferDesc *bufHdr; + AllocCksumBuf(); + CheckDoubleWriteFile(); /* Make sure we can handle the pin inside the loop */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); *************** *** 2223,2237 **** FlushDatabaseBuffers(Oid dbid) if (bufHdr->tag.rnode.dbNode == dbid && (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ! PinBuffer_Locked(bufHdr); ! LWLockAcquire(bufHdr->content_lock, LW_SHARED); ! FlushBuffer(bufHdr, NULL); ! LWLockRelease(bufHdr->content_lock); ! UnpinBuffer(bufHdr, true); } else UnlockBufHdr(bufHdr); } } /* --- 2641,2657 ---- if (bufHdr->tag.rnode.dbNode == dbid && (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { ! FlushBufferBatched(bufHdr, NULL, doubleWrites); } else UnlockBufHdr(bufHdr); } + if (doubleWrites && writeIndex > 0) + { + /* Write out any incomplete batch */ + FlushBatch(true); + } + FreeCksumBuf(); } /* *************** *** 2648,2655 **** WaitIO(volatile BufferDesc *buf) static bool StartBufferIO(volatile BufferDesc *buf, bool forInput) { - Assert(!InProgressBuf); - for (;;) { /* --- 3068,3073 ---- *************** *** 2688,2694 **** StartBufferIO(volatile BufferDesc *buf, bool forInput) UnlockBufHdr(buf); ! InProgressBuf = buf; IsForInput = forInput; return true; --- 3106,3112 ---- UnlockBufHdr(buf); ! InProgressBuf[InProgressIndex++] = buf; IsForInput = forInput; return true; *************** *** 2715,2721 **** static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, int set_flag_bits) { ! Assert(buf == InProgressBuf); LockBufHdr(buf); --- 3133,3140 ---- TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, int set_flag_bits) { ! /* BufferSync() will release batched buffers in reverse order */ ! Assert(InProgressBuf[InProgressIndex - 1] == buf); LockBufHdr(buf); *************** *** 2727,2734 **** TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, UnlockBufHdr(buf); ! InProgressBuf = NULL; ! LWLockRelease(buf->io_in_progress_lock); } --- 3146,3153 ---- UnlockBufHdr(buf); ! InProgressIndex--; ! InProgressBuf[InProgressIndex] = NULL; LWLockRelease(buf->io_in_progress_lock); } *************** *** 2744,2753 **** TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, void AbortBufferIO(void) { ! volatile BufferDesc *buf = InProgressBuf; ! if (buf) { /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that --- 3163,3175 ---- void AbortBufferIO(void) { ! volatile BufferDesc *buf; ! int i; ! for (i = InProgressIndex - 1; i >= 0; i--) { + buf = InProgressBuf[i]; + /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that *************** *** 2789,2794 **** AbortBufferIO(void) --- 3211,3217 ---- } TerminateBufferIO(buf, false, BM_IO_ERROR); } + InProgressIndex = 0; } /* *** a/src/backend/storage/file/fd.c --- b/src/backend/storage/file/fd.c *************** *** 55,60 **** --- 55,61 ---- #include "catalog/pg_tablespace.h" #include "storage/fd.h" #include "storage/ipc.h" + #include "storage/smgr.h" #include "utils/guc.h" #include "utils/resowner.h" *************** *** 1335,1340 **** retry: --- 1336,1565 ---- return returnCode; } + struct io_iocb_common + { + void *buf; + unsigned nbytes; + long long offset; + }; + + struct iocb + { + void *data; + int aio_fildes; + union + { + struct io_iocb_common c; + } u; + }; + + /* Array of iocbs used for io_submit */ + struct iocb ioc[MAX_BATCHED_WRITES]; + int iocIndex = 0; + + /* Array of iovecs use for iocbs and for writev. One extra entry for + * double-write header. */ + struct iovec iov[MAX_BATCHED_WRITES + 1]; + int iovIndex = 0; + + /* + * Return number of bytes described by the list of memory regions (struct + * iovec elements) referenced by ioc. + */ + static int + iovlen(struct iocb *ioc) + { + int i, + len; + struct iovec *iov = (struct iovec *) ioc->u.c.buf; + + len = 0; + for (i = 0; i < ioc->u.c.nbytes; i++) + { + len += iov->iov_len; + iov++; + } + return len; + } + + /* + * Flush existing batched IOs, doing a write to a double-write file first + * if double_writes option is turned on. When called, iov[] lists all + * the memory regions being written, in order, and ioc[] groups together + * consecutive iov elements which go to consecutive locations on disk. + * Each ioc represents a set of memory regions that can be written to + * disk in a single write using writev(). + */ + static int + FlushIOs(File doubleWriteFile) + { + int returnCode; + int i, + j; + + if (doubleWriteFile > 0) + { + returnCode = FileAccess(doubleWriteFile); + if (returnCode < 0) + return returnCode; + returnCode = lseek(VfdCache[doubleWriteFile].fd, 0, SEEK_SET); + if (returnCode < 0) + { + elog(LOG, "lseek error errno %d, rc %d", + errno, returnCode); + return returnCode; + } + + /* + * Write out all the blocks sequentially in double-write file, + * starting with the double-write file header describing all the + * buffers. + */ + returnCode = writev(VfdCache[doubleWriteFile].fd, &iov[0], iovIndex); + if (returnCode < 0) + { + elog(LOG, "writev to tmp, errno %d, rc %d, iovIndex %d", + errno, returnCode, iovIndex); + return returnCode; + } + returnCode = fdatasync(VfdCache[doubleWriteFile].fd); + if (returnCode < 0) + { + elog(LOG, "fdatasync error errno %d, rc %d", errno, + returnCode); + return returnCode; + } + } + + for (i = 0; i < iocIndex; i++) + { + returnCode = lseek(ioc[i].aio_fildes, ioc[i].u.c.offset, SEEK_SET); + if (returnCode < 0) + { + elog(LOG, "lseek error errno %d, rc %d, offset %Ld", + errno, returnCode, ioc[i].u.c.offset); + return returnCode; + } + /* Use writev to do one IO per consecutive blocks to the disk. */ + returnCode = writev(ioc[i].aio_fildes, ioc[i].u.c.buf, + ioc[i].u.c.nbytes); + if (returnCode < 0) + { + elog(LOG, "writev error errno %d, rc %d", errno, returnCode); + return returnCode; + } + } + if (doubleWriteFile > 0) + { + /* + * If we are doing double writes, must make sure that the blocks was + * just wrote are forced to disk, so we can re-use double-write buffer + * next time. + */ + for (i = 0; i < iocIndex; i++) + { + for (j = 0; j < i; j++) + { + if (ioc[j].aio_fildes == ioc[i].aio_fildes) + { + break; + } + } + if (j == i) + { + returnCode = fdatasync(ioc[i].aio_fildes); + if (returnCode < 0) + { + elog(LOG, "fdatasync error errno %d, rc %d", errno, + returnCode); + return returnCode; + } + } + } + } + return 0; + } + + /* + * Write out a batch of memory regions (Postgres buffers) to locations in + * files, as specified in writeList. If doubleWriteFile is >= 0, then + * also do double writes to the specified file (so full_page_writes can + * be avoided). + */ + int + FileBwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doubleBuf) + { + int returnCode; + int i; + + for (i = 0; i < writeLen; i++) + { + struct SMgrWriteList *w = &(writeList[i]); + + Assert(FileIsValid(w->fd)); + DO_DB(elog(LOG, "FileBwrite: %d (%s) " INT64_FORMAT " %d %p", + file, VfdCache[w->fd].fileName, + (int64) VfdCache[w->fd].seekPos, + w->len, w->callerBuf)); + + returnCode = FileAccess(w->fd); + if (returnCode < 0) + return returnCode; + } + + iovIndex = 0; + iocIndex = 0; + + if (doubleWriteFile > 0) + { + /* + * Write out the double-write header (which lists all the blocks) in a + * 4K chunk at the beginning of the double-write file. + */ + iov[iovIndex].iov_base = doubleBuf; + iov[iovIndex].iov_len = DOUBLE_WRITE_HEADER_SIZE; + iovIndex++; + } + + /* + * Convert the list of writes (writeList) into an iovec array iov that + * describes all the memory regions being written, and an ioc array for + * each consecutive set of iov elements which are going to the consecutive + * locations in the same file. Each ioc element describes a set of memory + * regions that can be written to the disk in a single write usinf + * writev() (also known as a scatter-gather array). + */ + for (i = 0; i < writeLen; i++) + { + struct SMgrWriteList *w = &(writeList[i]); + struct iocb *last; + + iov[iovIndex].iov_base = w->callerBuf; + iov[iovIndex].iov_len = w->len; + last = (iocIndex > 0) ? &(ioc[iocIndex - 1]) : NULL; + if (iocIndex > 0 && w->seekPos == last->u.c.offset + iovlen(last) && + last->aio_fildes == VfdCache[w->fd].fd) + { + last->u.c.nbytes++; + } + else + { + ioc[iocIndex].aio_fildes = VfdCache[w->fd].fd; + ioc[iocIndex].u.c.buf = (void *) &iov[iovIndex]; + ioc[iocIndex].u.c.nbytes = 1; + ioc[iocIndex].u.c.offset = w->seekPos; + iocIndex++; + } + iovIndex++; + VfdCache[w->fd].seekPos = FileUnknownPos; + } + + returnCode = FlushIOs(doubleWriteFile); + + return returnCode; + } + int FileSync(File file) { *** a/src/backend/storage/lmgr/lwlock.c --- b/src/backend/storage/lmgr/lwlock.c *************** *** 81,88 **** NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; * during error recovery. The maximum size could be determined at runtime * if necessary, but it seems unlikely that more than a few locks could * ever be held simultaneously. */ ! #define MAX_SIMUL_LWLOCKS 100 static int num_held_lwlocks = 0; static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; --- 81,91 ---- * during error recovery. The maximum size could be determined at runtime * if necessary, but it seems unlikely that more than a few locks could * ever be held simultaneously. + * + * Must be at least 2 * MAX_BATCHED_WRITES, since checkpointer/bgwriter + * holds 2 lwlocks for each buffer that it is batching. */ ! #define MAX_SIMUL_LWLOCKS (Max(100, (2 * MAX_BATCHED_WRITES))) static int num_held_lwlocks = 0; static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; *** a/src/backend/storage/smgr/md.c --- b/src/backend/storage/smgr/md.c *************** *** 745,750 **** mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, --- 745,799 ---- } /* + * Write out a list of buffers, as specified in writeList. If + * doubleWriteFile is >= 0, then also do double writes to the specified + * file (so full_page_writes can be avoided). + */ + void + mdbwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doubleBuf) + { + off_t seekpos; + int nbytes; + MdfdVec *v; + File fd; + int i; + + for (i = 0; i < writeLen; i++) + { + struct SMgrWriteList *w = &(writeList[i]); + + /* This assert is too expensive to have on normally ... */ + #ifdef CHECK_WRITE_VS_EXTEND + Assert(w->blockNum < mdnblocks(w->reln, w->forknum)); + #endif + v = _mdfd_getseg(w->reln, w->forkNum, w->blockNum, false, EXTENSION_FAIL); + fd = v->mdfd_vfd; + + seekpos = (off_t) BLCKSZ *(w->blockNum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + w->fd = fd; + w->seekPos = seekpos; + w->len = BLCKSZ; + } + + nbytes = FileBwrite(writeLen, writeList, doubleWriteFile, doubleBuf); + if (nbytes < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("FileBwrite error %d", errno))); + } + + /* + * XXX register_dirty_segment() call is not needed if we only do batched + * writes for the double_write option. + */ + } + + + /* * mdnblocks() -- Get the number of blocks stored in a relation. * * Important side effect: all active segments of the relation are opened *** a/src/backend/storage/smgr/smgr.c --- b/src/backend/storage/smgr/smgr.c *************** *** 15,22 **** --- 15,25 ---- * *------------------------------------------------------------------------- */ + #include + #include "postgres.h" + #include "catalog/catalog.h" #include "commands/tablespace.h" #include "storage/bufmgr.h" #include "storage/ipc.h" *************** *** 53,58 **** typedef struct f_smgr --- 56,63 ---- BlockNumber blocknum, char *buffer); void (*smgr_write) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); + void (*smgr_bwrite) (int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doubleBuf); BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum); void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); *************** *** 66,72 **** typedef struct f_smgr static const f_smgr smgrsw[] = { /* magnetic disk */ {mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend, ! mdprefetch, mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync, mdpreckpt, mdsync, mdpostckpt } }; --- 71,77 ---- static const f_smgr smgrsw[] = { /* magnetic disk */ {mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend, ! mdprefetch, mdread, mdwrite, mdbwrite, mdnblocks, mdtruncate, mdimmedsync, mdpreckpt, mdsync, mdpostckpt } }; *************** *** 79,87 **** static const int NSmgr = lengthof(smgrsw); --- 84,100 ---- */ static HTAB *SMgrRelationHash = NULL; + /* Page checksumming. */ + static uint64 tempbuf[BLCKSZ / sizeof(uint64)]; + extern bool page_checksum; + + #define INVALID_CKSUM 0x1b0af034 + /* local function prototypes */ static void smgrshutdown(int code, Datum arg); + /* Buffer used to write the double-write header */ + static char doubleBuf[DOUBLE_WRITE_HEADER_SIZE]; /* * smgrinit(), smgrshutdown() -- Initialize or shut down storage *************** *** 381,386 **** smgrdounlink(SMgrRelation reln, ForkNumber forknum, bool isRedo) --- 394,454 ---- } /* + * The initial value when computing the checksum for a data page. + */ + static inline uint64 + ChecksumInit(SMgrRelation reln, ForkNumber f, BlockNumber b) + { + return b + f; + } + + /* + * Compute a checksum of buffer (with length len), using initial value + * cksum. We use a relatively simple checksum calculation to avoid + * overhead, but could replace with some kind of CRC calculation. + */ + static inline uint32 + ComputeChecksum(uint64 *buffer, uint32 len, uint64 cksum) + { + int i; + + for (i = 0; i < len / sizeof(uint64); i += 4) + { + cksum += (cksum << 5) + *buffer; + cksum += (cksum << 5) + *(buffer + 1); + cksum += (cksum << 5) + *(buffer + 2); + cksum += (cksum << 5) + *(buffer + 3); + buffer += 4; + } + cksum = (cksum & 0xFFFFFFFF) + (cksum >> 32); + return cksum; + } + + /* + * Copy buffer to dst and compute the checksum during the copy (so that + * the checksum is correct for the final contents of dst). + */ + static inline uint32 + CopyAndComputeChecksum(uint64 *dst, volatile uint64 *buffer, + uint32 len, uint64 cksum) + { + int i; + + for (i = 0; i < len / sizeof(uint64); i += 4) + { + cksum += (cksum << 5) + (*dst = *buffer); + cksum += (cksum << 5) + (*(dst + 1) = *(buffer + 1)); + cksum += (cksum << 5) + (*(dst + 2) = *(buffer + 2)); + cksum += (cksum << 5) + (*(dst + 3) = *(buffer + 3)); + dst += 4; + buffer += 4; + } + cksum = (cksum & 0xFFFFFFFF) + (cksum >> 32); + return cksum; + } + + + /* * smgrextend() -- Add a new block to a file. * * The semantics are nearly the same as smgrwrite(): write at the *************** *** 393,400 **** void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync) { (*(smgrsw[reln->smgr_which].smgr_extend)) (reln, forknum, blocknum, ! buffer, skipFsync); } /* --- 461,490 ---- smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync) { + PageHeader p; + + Assert(PageGetPageLayoutVersion(((PageHeader) buffer)) == PG_PAGE_LAYOUT_VERSION || + PageIsNew(buffer)); + if (page_checksum) + { + p = (PageHeader) tempbuf; + ((PageHeader) buffer)->cksum = 0; + + /* + * We copy and compute the checksum, and then write out the data from + * the copy, so that we avoid any problem with hint bits changing + * after we compute the checksum. + */ + p->cksum = CopyAndComputeChecksum(tempbuf, (uint64 *) buffer, BLCKSZ, + ChecksumInit(reln, forknum, blocknum)); + } + else + { + p = (PageHeader) buffer; + p->cksum = INVALID_CKSUM; + } (*(smgrsw[reln->smgr_which].smgr_extend)) (reln, forknum, blocknum, ! (char *) p, skipFsync); } /* *************** *** 416,424 **** smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) */ void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ! char *buffer) { (*(smgrsw[reln->smgr_which].smgr_read)) (reln, forknum, blocknum, buffer); } /* --- 506,543 ---- */ void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ! char *buffer, bool *cksumMismatch) { + PageHeader p = (PageHeader) buffer; + (*(smgrsw[reln->smgr_which].smgr_read)) (reln, forknum, blocknum, buffer); + Assert(PageIsNew(p) || PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION); + if (page_checksum && p->cksum != INVALID_CKSUM) + { + const uint32 diskCksum = p->cksum; + uint32 cksum; + + p->cksum = 0; + cksum = ComputeChecksum((uint64 *) buffer, BLCKSZ, + ChecksumInit(reln, forknum, blocknum)); + if (cksum != diskCksum) + { + if (cksumMismatch != NULL) + { + *cksumMismatch = TRUE; + return; + } + ereport(PANIC, (0, errmsg("checksum mismatch: disk has %#x, should be %#x\n" + "filename %s, BlockNum %u, block specifier %d/%d/%d/%d/%u", + diskCksum, (uint32) cksum, + relpath(reln->smgr_rnode, forknum), + blocknum, + reln->smgr_rnode.node.spcNode, + reln->smgr_rnode.node.dbNode, + reln->smgr_rnode.node.relNode, + forknum, blocknum))); + } + } } /* *************** *** 440,447 **** void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync) { (*(smgrsw[reln->smgr_which].smgr_write)) (reln, forknum, blocknum, ! buffer, skipFsync); } /* --- 559,673 ---- smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync) { + PageHeader p; + + if (page_checksum) + { + p = (PageHeader) tempbuf; + ((PageHeader) buffer)->cksum = 0; + + /* + * We copy and compute the checksum, and then write out the data from + * the copy, so that we avoid any problem with hint bits changing + * after we compute the checksum. + */ + p->cksum = CopyAndComputeChecksum(tempbuf, (uint64 *) buffer, BLCKSZ, + ChecksumInit(reln, forknum, blocknum)); + } + else + { + p = (PageHeader) buffer; + p->cksum = INVALID_CKSUM; + } + Assert(PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION); (*(smgrsw[reln->smgr_which].smgr_write)) (reln, forknum, blocknum, ! (char *) p, skipFsync); ! } ! ! /* ! * Write out a list of buffers, as specified in writeList. If ! * doubleWriteFile is >= 0, then also do double writes to the specified ! * file (so full_page_writes can be avoided). ! */ ! void ! smgrbwrite(int writeLen, struct SMgrWriteList *writeList, ! File doubleWriteFile) ! { ! PageHeader p = NULL; ! int i; ! ! if (page_checksum) ! { ! for (i = 0; i < writeLen; i++) ! { ! struct SMgrWriteList *w = &(writeList[i]); ! ! p = (PageHeader) w->callerBuf; ! if (p == NULL) ! { ! /* Use tempbuf in the 1-page case (BufferAlloc) */ ! Assert(writeLen == 1); ! p = (PageHeader) tempbuf; ! w->callerBuf = (char *) tempbuf; ! } ! ((PageHeader) w->buffer)->cksum = 0; ! p->cksum = CopyAndComputeChecksum((uint64 *) p, ! (uint64 *) w->buffer, ! BLCKSZ, ! ChecksumInit(w->reln, ! w->forkNum, ! w->blockNum)); ! Assert(PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION); ! } ! } ! else ! { ! for (i = 0; i < writeLen; i++) ! { ! p = (PageHeader) writeList[i].buffer; ! writeList[i].callerBuf = (char *) p; ! p->cksum = INVALID_CKSUM; ! Assert(PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION); ! } ! } ! ! if (doubleWriteFile > 0) ! { ! /* ! * Set up the initial double-write page that lists all the buffers ! * that will be written to the double write file This list includes ! * the checksums of all the buffers and is checksummed itself. ! */ ! struct DoubleBufHeader *hdr = (struct DoubleBufHeader *) doubleBuf; ! struct DoubleBufItem *item = hdr->items; ! ! /* ! * The double-write header size should be big enough to contain info ! * for up to MAX_BATCHED_WRITES buffers. ! */ ! Assert(sizeof(struct DoubleBufHeader) + MAX_BATCHED_WRITES * sizeof(struct DoubleBufItem) <= DOUBLE_WRITE_HEADER_SIZE); ! for (i = 0; i < writeLen; i++) ! { ! item->rnode = writeList[i].reln->smgr_rnode; ! item->forkNum = writeList[i].forkNum; ! item->blockNum = writeList[i].blockNum; ! p = (PageHeader) writeList[i].callerBuf; ! item->cksum = p->cksum; ! item->pd_lsn = p->pd_lsn; ! item++; ! } ! hdr->writeLen = writeLen; ! hdr->cksum = ComputeChecksum((uint64 *) hdr, DOUBLE_WRITE_HEADER_SIZE, 0); ! } ! ! (*(smgrsw[writeList[0].reln->smgr_which].smgr_bwrite)) (writeLen, writeList, ! doubleWriteFile, doubleBuf); ! if (doubleWriteFile > 0) ! { ! /* Zero out part of header that we filled in. */ ! memset(doubleBuf, 0, ! (char *) &(((struct DoubleBufHeader *) doubleBuf)->items[writeLen]) - doubleBuf); ! } } /* *************** *** 561,563 **** smgrpostckpt(void) --- 787,949 ---- (*(smgrsw[i].smgr_post_ckpt)) (); } } + + extern char *double_write_directory; + + /* + * Return the name of the double write file. Caller must use pfree(). + */ + char * + DoubleWriteFileName() + { + char *name; + + if (double_write_directory != NULL) + { + name = palloc(strlen(double_write_directory) + strlen("/double") + 1); + sprintf(name, "%s/double", double_write_directory); + } + else + { + name = pstrdup("base/double"); + } + return name; + } + + /* + * Called by postmaster at startup during recovery to read double-write + * file and see if there are any data blocks to be recovered. + */ + void + RecoverDoubleWriteFile() + { + char *name; + struct stat stat_buf; + File fd; + int r; + struct DoubleBufHeader *hdr; + int i; + uint32 savedCksum; + + name = DoubleWriteFileName(); + if (stat(name, &stat_buf) == -1) + { + elog(LOG, "No double-write file"); + pfree(name); + return; + } + fd = PathNameOpenFile(name, O_RDONLY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + elog(PANIC, "Double-write file %s exists but can't be opened", name); + } + + r = FileRead(fd, doubleBuf, DOUBLE_WRITE_HEADER_SIZE); + if (r < 0) + goto badformat; + + hdr = (struct DoubleBufHeader *) doubleBuf; + if (hdr->writeLen == 0 && hdr->cksum == 0) + { + elog(LOG, "Double-write file %s is zeroed out", name); + FileClose(fd); + pfree(name); + return; + } + if (hdr->writeLen < 1 || hdr->writeLen > MAX_BATCHED_WRITES) + goto badformat; + + savedCksum = hdr->cksum; + hdr->cksum = 0; + if (savedCksum != + ComputeChecksum((uint64 *) hdr, DOUBLE_WRITE_HEADER_SIZE, 0)) + goto badformat; + + for (i = 0; i < hdr->writeLen; i++) + { + struct DoubleBufItem *it = &(hdr->items[i]); + SMgrRelation smgr; + bool mismatch; + PageHeader p; + + /* + * For each block described in double-write file header, see if the + * block in the database file has a checksum mismatch. If so, restore + * the block from the double-write file if that entry has correct + * checksum. + */ + smgr = smgropen(it->rnode.node, InvalidBackendId); + mismatch = false; + + /* + * The block may no longer exist if relation was deleted/truncated + * after the last double-write. + */ + if (!smgrexists(smgr, it->forkNum) || + it->blockNum >= smgrnblocks(smgr, it->forkNum)) + { + elog(LOG, "Block %s/%d in slot %d of double-write file no longer exists, skipping", + relpath(it->rnode, it->forkNum), it->blockNum, i); + continue; + } + + smgrread(smgr, it->forkNum, it->blockNum, (char *) tempbuf, &mismatch); + if (mismatch) + { + /* + * The corresponding data block has a checksum error, and is + * likely a torn page. See if the block in the double-write file + * has the correct checksum. If so, we can correct the data page + * from the block in the double-write file. + */ + FileSeek(fd, DOUBLE_WRITE_HEADER_SIZE + i * BLCKSZ, SEEK_SET); + FileRead(fd, (char *) tempbuf, BLCKSZ); + p = (PageHeader) tempbuf; + savedCksum = p->cksum; + p->cksum = 0; + if (savedCksum != it->cksum || + savedCksum != ComputeChecksum((uint64 *) tempbuf, BLCKSZ, + ChecksumInit(smgr, it->forkNum, + it->blockNum))) + { + elog(LOG, "Block %s/%d has checksum error, but can't be fixed, because slot %d of double-write file %s looks invalid", + relpath(it->rnode, it->forkNum), it->blockNum, i, name); + } + else + { + /* + * Correct the block in the data file from the block in the + * double-write file. + */ + Assert(XLByteEQ(p->pd_lsn, it->pd_lsn)); + smgrwrite(smgr, it->forkNum, it->blockNum, (char *) tempbuf, false); + elog(LOG, "Fixed block %s/%d (which had checksum error) from double-write file %s slot %d", + relpath(it->rnode, it->forkNum), it->blockNum, name, i); + } + } + else + { + elog(DEBUG1, "Skipping slot %d of double-write file because block %s/%d is correct", + i, relpath(it->rnode, it->forkNum), it->blockNum); + } + smgrclose(smgr); + } + FileClose(fd); + + /* + * Remove double-write file, so it can't be re-applied again if crash + * happens during recovery. + */ + if (unlink(name) == -1) + { + elog(PANIC, "Failed to remove double-write file"); + } + pfree(name); + return; + + badformat: + elog(LOG, "Double-write file %s has bad format", name); + FileClose(fd); + pfree(name); + return; + } *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 130,135 **** extern int CommitSiblings; --- 130,137 ---- extern char *default_tablespace; extern char *temp_tablespaces; extern bool synchronize_seqscans; + bool doubleWrites; + char *double_write_directory; extern bool fullPageWrites; extern int ssl_renegotiation_limit; extern char *SSLCipherSuites; *************** *** 419,424 **** bool default_with_oids = false; --- 421,428 ---- bool SQL_inheritance = true; bool Password_encryption = true; + bool page_checksum = true; + int log_min_error_statement = ERROR; int log_min_messages = WARNING; *************** *** 445,450 **** int tcp_keepalives_idle; --- 449,456 ---- int tcp_keepalives_interval; int tcp_keepalives_count; + int batched_buffer_writes = 0; + /* * These variables are all dummies that don't do anything, except in some * cases provide the value for SHOW to display. The real state is elsewhere *************** *** 816,821 **** static struct config_bool ConfigureNamesBool[] = --- 822,835 ---- NULL, NULL, NULL }, { + {"double_writes", PGC_POSTMASTER, DEVELOPER_OPTIONS, + gettext_noop("Page writes are written first to a temporary file before being written to the database file."), + NULL + }, + &doubleWrites, + false, NULL, NULL + }, + { {"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS, gettext_noop("Continues processing past damaged page headers."), gettext_noop("Detection of a damaged page header normally causes PostgreSQL to " *************** *** 1438,1443 **** static struct config_bool ConfigureNamesBool[] = --- 1452,1465 ---- NULL, NULL, NULL }, + { + {"page_checksum", PGC_POSTMASTER, CUSTOM_OPTIONS, + gettext_noop("enable disk page checksumming"), + NULL + }, + &page_checksum, true, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL *************** *** 2370,2375 **** static struct config_int ConfigureNamesInt[] = --- 2392,2407 ---- NULL, NULL, NULL }, + { + {"batched_buffer_writes", PGC_POSTMASTER, DEVELOPER_OPTIONS, + gettext_noop("Checkpointer/bgwriter writes buffers in batches."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &batched_buffer_writes, + 0, 0, MAX_BATCHED_WRITES, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL *************** *** 3018,3023 **** static struct config_string ConfigureNamesString[] = --- 3050,3065 ---- check_application_name, assign_application_name, NULL }, + { + {"double_write_directory", PGC_POSTMASTER, DEVELOPER_OPTIONS, + gettext_noop("Location of the double write file."), + NULL, + GUC_REPORT | GUC_NOT_IN_SAMPLE + }, + &double_write_directory, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 521,526 **** --- 521,527 ---- # LOCK MANAGEMENT #------------------------------------------------------------------------------ + #page_checksum = on #deadlock_timeout = 1s #max_locks_per_transaction = 64 # min 10 # (change requires restart) *************** *** 564,566 **** --- 565,570 ---- #------------------------------------------------------------------------------ # Add settings for extensions here + #double_writes = off + #double_write_directory = '' + #batched_buffer_writes = 0 *** a/src/backend/utils/resowner/resowner.c --- b/src/backend/utils/resowner/resowner.c *************** *** 517,523 **** ResourceOwnerEnlargeBuffers(ResourceOwner owner) if (owner->buffers == NULL) { ! newmax = 16; owner->buffers = (Buffer *) MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Buffer)); owner->maxbuffers = newmax; --- 517,527 ---- if (owner->buffers == NULL) { ! /* ! * This should be at least MAX_BATCHED_WRITES, since bgwriter can lock ! * that many buffers at one time. ! */ ! newmax = MAX_BATCHED_WRITES; owner->buffers = (Buffer *) MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Buffer)); owner->maxbuffers = newmax; *** a/src/include/access/heapam.h --- b/src/include/access/heapam.h *************** *** 124,130 **** extern void simple_heap_update(Relation relation, ItemPointer otid, extern void heap_markpos(HeapScanDesc scan); extern void heap_restrpos(HeapScanDesc scan); ! extern void heap_sync(Relation relation); extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec); --- 124,130 ---- extern void heap_markpos(HeapScanDesc scan); extern void heap_restrpos(HeapScanDesc scan); ! extern void heap_sync(Relation relation, bool needsDoubleWrite); extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec); *** a/src/include/c.h --- b/src/include/c.h *************** *** 735,740 **** typedef NameData *Name; --- 735,742 ---- #define PG_TEXTDOMAIN(domain) (domain "-" PG_MAJORVERSION) #endif + /* Maximum number of buffers that can be written in a batch. */ + #define MAX_BATCHED_WRITES 64 /* ---------------------------------------------------------------- * Section 8: system-specific hacks *** a/src/include/storage/bufmgr.h --- b/src/include/storage/bufmgr.h *************** *** 183,189 **** extern void CheckPointBuffers(int flags); extern BlockNumber BufferGetBlockNumber(Buffer buffer); extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum); ! extern void FlushRelationBuffers(Relation rel); extern void FlushDatabaseBuffers(Oid dbid); extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, BlockNumber firstDelBlock); --- 183,189 ---- extern BlockNumber BufferGetBlockNumber(Buffer buffer); extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum); ! extern void FlushRelationBuffers(Relation rel, bool needsDoubleWrite); extern void FlushDatabaseBuffers(Oid dbid); extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, BlockNumber firstDelBlock); *** a/src/include/storage/bufpage.h --- b/src/include/storage/bufpage.h *************** *** 132,137 **** typedef struct PageHeaderData --- 132,138 ---- LocationIndex pd_special; /* offset to start of special space */ uint16 pd_pagesize_version; TransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */ + uint32 cksum; /* page checksum */ ItemIdData pd_linp[1]; /* beginning of line pointer array */ } PageHeaderData; *************** *** 154,160 **** typedef PageHeaderData *PageHeader; * tuple? */ #define PD_ALL_VISIBLE 0x0004 /* all tuples on page are visible to * everyone */ - #define PD_VALID_FLAG_BITS 0x0007 /* OR of all valid pd_flags bits */ /* --- 155,160 ---- *************** *** 165,172 **** typedef PageHeaderData *PageHeader; * Release 8.3 uses 4; it changed the HeapTupleHeader layout again, and * added the pd_flags field (by stealing some bits from pd_tli), * as well as adding the pd_prune_xid field (which enlarges the header). */ ! #define PG_PAGE_LAYOUT_VERSION 4 /* ---------------------------------------------------------------- --- 165,173 ---- * Release 8.3 uses 4; it changed the HeapTupleHeader layout again, and * added the pd_flags field (by stealing some bits from pd_tli), * as well as adding the pd_prune_xid field (which enlarges the header). + * Release x.y uses 5; we added checksums to heap/index/fsm files. */ ! #define PG_PAGE_LAYOUT_VERSION 5 /* ---------------------------------------------------------------- *** a/src/include/storage/fd.h --- b/src/include/storage/fd.h *************** *** 53,63 **** typedef int File; --- 53,70 ---- /* GUC parameter */ extern int max_files_per_process; + /* + * Size of the header on the double-write file that describes the buffers + * currently written to the double-write file. + */ + #define DOUBLE_WRITE_HEADER_SIZE 4096 /* * prototypes for functions in fd.c */ + struct SMgrWriteList; + /* Operations on virtual Files --- equivalent to Unix kernel file ops */ extern File PathNameOpenFile(FileName fileName, int fileFlags, int fileMode); extern File OpenTemporaryFile(bool interXact); *************** *** 67,72 **** extern int FilePrefetch(File file, off_t offset, int amount); --- 74,81 ---- extern int FileRead(File file, char *buffer, int amount); extern int FileWrite(File file, char *buffer, int amount); extern int FileSync(File file); + extern int FileBwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doubleBuf); extern off_t FileSeek(File file, off_t offset, int whence); extern int FileTruncate(File file, off_t offset); extern char *FilePathName(File file); *** a/src/include/storage/lwlock.h --- b/src/include/storage/lwlock.h *************** *** 79,84 **** typedef enum LWLockId --- 79,85 ---- SerializablePredicateLockListLock, OldSerXidLock, SyncRepLock, + DoubleWriteLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, *** a/src/include/storage/smgr.h --- b/src/include/storage/smgr.h *************** *** 15,22 **** --- 15,24 ---- #define SMGR_H #include "fmgr.h" + #include "fd.h" #include "storage/block.h" #include "storage/relfilenode.h" + #include "access/xlogdefs.h" /* *************** *** 71,76 **** typedef SMgrRelationData *SMgrRelation; --- 73,123 ---- #define SmgrIsTemp(smgr) \ ((smgr)->smgr_rnode.backend != InvalidBackendId) + /* + * Element of an array specifying a buffer write in a list of buffer + * writes being batched. + */ + struct SMgrWriteList + { + int buf_id; + SMgrRelation reln; + ForkNumber forkNum; + BlockNumber blockNum; + char *buffer; + char *callerBuf; + + /* Filled in by mdawrite */ + File fd; + off_t seekPos; + int len; + }; + + /* + * Description of one buffer in the double-write file, as listed in the + * header. + */ + struct DoubleBufItem + { + /* Specification of where this buffer is in the database */ + RelFileNodeBackend rnode; /* physical relation identifier */ + ForkNumber forkNum; + BlockNumber blockNum; /* blknum relative to begin of reln */ + /* Checksum of the buffer. */ + int32 cksum; + /* LSN of the buffer */ + XLogRecPtr pd_lsn; + }; + + /* Format of the header of the double-write file */ + struct DoubleBufHeader + { + uint32 cksum; + int32 writeLen; + uint32 pad1; + uint32 pad2; + struct DoubleBufItem items[0]; + }; + extern void smgrinit(void); extern SMgrRelation smgropen(RelFileNode rnode, BackendId backend); extern void smgrsettransient(SMgrRelation reln); *************** *** 87,95 **** extern void smgrextend(SMgrRelation reln, ForkNumber forknum, extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); extern void smgrread(SMgrRelation reln, ForkNumber forknum, ! BlockNumber blocknum, char *buffer); extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); --- 134,144 ---- extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); extern void smgrread(SMgrRelation reln, ForkNumber forknum, ! BlockNumber blocknum, char *buffer, bool *cksumMismatch); extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); + extern void smgrbwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); *************** *** 98,103 **** extern void smgrpreckpt(void); --- 147,155 ---- extern void smgrsync(void); extern void smgrpostckpt(void); + extern char *DoubleWriteFileName(void); + extern void RecoverDoubleWriteFile(void); + /* internals: move me elsewhere -- ay 7/94 */ *************** *** 115,120 **** extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, --- 167,174 ---- char *buffer); extern void mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); + extern void mdbwrite(int writeLen, struct SMgrWriteList *writeList, + File doubleWriteFile, char *doublebuf); extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); extern void mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks);