diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 1da7dfb..7a3d274 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2474,6 +2474,29 @@ include_dir 'conf.d'
+
+ checkpoint_sort_size (integer)
+
+ checkpoint_sort_size> configuration parameter
+
+
+
+
+ The number of pages in the chunks sorted together before being written
+ out to disk by a checkpoint.
+ For a HDD storage, this setting allows to group together
+ neighboring pages written to disk, thus improving performance by
+ reducing random write activity.
+ This sorting can be skipped for SSD backends as such storages have good
+ random write performance.
+ The default is 131072>.
+ This feature is turned off by setting the value to 0>.
+ This parameter can only be set in the postgresql.conf>
+ file or on the server command line.
+
+
+
+
checkpoint_warning (integer)
@@ -2495,6 +2518,24 @@ include_dir 'conf.d'
+
+ checkpoint_flush_to_disk (bool)
+
+ checkpoint_flush_to_disk> configuration parameter
+
+
+
+
+ When writing data for a checkpoint, hint the underlying OS that the
+ data must be sent to disk as soon as possible. This may help smoothing
+ disk I/O writes and avoid a stall when fsync is issued at the end of
+ the checkpoint, but it may also reduce average performance.
+ This setting may have no effect on some platforms.
+ The default is off>.
+
+
+
+
min_wal_size (integer)
diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml
index f4083c3..2b6aab7 100644
--- a/doc/src/sgml/wal.sgml
+++ b/doc/src/sgml/wal.sgml
@@ -546,6 +546,27 @@
+ When hard-disk drives (HDD) are used for terminal data storage,
+ , allows to sort chunks of pages
+ so that neighboring pages on disk will be flushed together by
+ chekpoints, reducing the random write load and improving performance.
+ If solid-state drives (SSD) are used, sorting pages induces no benefit
+ as their random write I/O performance is good: this feature should then
+ be disabled by setting checkpoint_sort_size> to 0>.
+
+
+
+ On Linux and POSIX platforms,
+ allows to hint the OS that pages written on checkpoints must be flushed
+ to disk quickly. Otherwise, these pages may be kept in cache for some time,
+ inducing a stall later when fsync> is called to actually
+ complete the checkpoint. This setting helps to reduce transaction latency,
+ but it may also have a small adverse effect on the average transaction rate
+ at maximum throughput. This feature probably brings no benefit on SSD,
+ as the I/O write latency is small on such hardware, thus it may be disabled.
+
+
+
The number of WAL segment files in pg_xlog> directory depends on
min_wal_size>, max_wal_size> and
the amount of WAL generated in previous checkpoint cycles. When old log
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index bcce3e3..f565dc4 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -918,7 +918,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state)
* Note that we deviate from the usual WAL coding practices here,
* check the above "Logical rewrite support" comment for reasoning.
*/
- written = FileWrite(src->vfd, waldata_start, len);
+ written = FileWrite(src->vfd, waldata_start, len, false, NULL);
if (written != len)
ereport(ERROR,
(errcode_for_file_access(),
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 9431ab5..49ec258 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -203,7 +203,7 @@ btbuildempty(PG_FUNCTION_ARGS)
/* Write the page. If archiving/streaming, XLOG it. */
PageSetChecksumInplace(metapage, BTREE_METAPAGE);
smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
- (char *) metapage, true);
+ (char *) metapage, true, false, NULL);
if (XLogIsNeeded())
log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
BTREE_METAPAGE, metapage, false);
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index f95f67a..ea7a45d 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -315,7 +315,7 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
{
/* overwriting a block we zero-filled before */
smgrwrite(wstate->index->rd_smgr, MAIN_FORKNUM, blkno,
- (char *) page, true);
+ (char *) page, true, false, NULL);
}
pfree(page);
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index bceee8d..b700efb 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -170,7 +170,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
/* Write the page. If archiving/streaming, XLOG it. */
PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
- (char *) page, true);
+ (char *) page, true, false, NULL);
if (XLogIsNeeded())
log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
SPGIST_METAPAGE_BLKNO, page, false);
@@ -180,7 +180,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO,
- (char *) page, true);
+ (char *) page, true, false, NULL);
if (XLogIsNeeded())
log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
SPGIST_ROOT_BLKNO, page, true);
@@ -190,7 +190,7 @@ spgbuildempty(PG_FUNCTION_ARGS)
PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO,
- (char *) page, true);
+ (char *) page, true, false, NULL);
if (XLogIsNeeded())
log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
SPGIST_NULL_BLKNO, page, true);
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 0dce6a8..52dd7db 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -663,7 +663,8 @@ ImmediateCheckpointRequested(void)
* fraction between 0.0 meaning none, and 1.0 meaning all done.
*/
void
-CheckpointWriteDelay(int flags, double progress)
+CheckpointWriteDelay(int flags, double progress,
+ FileFlushContext * context, int ctx_size)
{
static int absorb_counter = WRITES_PER_ABSORB;
@@ -698,6 +699,26 @@ CheckpointWriteDelay(int flags, double progress)
*/
pgstat_send_bgwriter();
+#if defined(HAVE_SYNC_FILE_RANGE) || defined(HAVE_POSIX_FADVISE)
+
+ /*
+ * Before sleeping, flush written blocks for each tablespace.
+ */
+ if (checkpoint_flush_to_disk)
+ {
+ int i;
+
+ for (i = 0; i < ctx_size; i++)
+ {
+ if (context[i].ncalls != 0)
+ {
+ PerformFileFlush(&context[i]);
+ ResetFileFlushContext(&context[i]);
+ }
+ }
+ }
+#endif /* HAVE_SYNC_FILE_RANGE || HAVE_POSIX_FADVISE */
+
/*
* This sleep used to be connected to bgwriter_delay, typically 200ms.
* That resulted in more frequent wakeups if not much work to do.
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index cc973b5..3ea1028 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -80,6 +80,10 @@ bool zero_damaged_pages = false;
int bgwriter_lru_maxpages = 100;
double bgwriter_lru_multiplier = 2.0;
bool track_io_timing = false;
+/* hint to move writes to high priority */
+bool checkpoint_flush_to_disk = false;
+/* by default, sort by chunks of 1 GB worth of 8 kB buffers */
+int checkpoint_sort_size = 128 * 1024;
/*
* How many buffers PrefetchBuffer callers should try to stay ahead of their
@@ -396,7 +400,8 @@ static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(volatile BufferDesc *buf);
static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
static void BufferSync(int flags);
-static int SyncOneBuffer(int buf_id, bool skip_recently_used);
+static int SyncOneBuffer(int buf_id, bool skip_recently_used,
+ bool flush_to_disk, FileFlushContext *context);
static void WaitIO(volatile BufferDesc *buf);
static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
@@ -409,7 +414,8 @@ static volatile BufferDesc *BufferAlloc(SMgrRelation smgr,
BlockNumber blockNum,
BufferAccessStrategy strategy,
bool *foundPtr);
-static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
+static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln,
+ bool flush_to_disk, FileFlushContext *context);
static void AtProcExit_Buffers(int code, Datum arg);
static void CheckForBufferLeaks(void);
static int rnode_comparator(const void *p1, const void *p2);
@@ -1018,7 +1024,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
smgr->smgr_rnode.node.dbNode,
smgr->smgr_rnode.node.relNode);
- FlushBuffer(buf, NULL);
+ FlushBuffer(buf, NULL, false, NULL);
LWLockRelease(buf->content_lock);
TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
@@ -1561,6 +1567,75 @@ UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
}
}
+/* Array of buffer ids of all buffers to checkpoint.
+ */
+static int * CheckpointBufferIds = NULL;
+
+/* Compare checkpoint buffers
+ */
+static int bufcmp(const int * pa, const int * pb)
+{
+ BufferDesc
+ *a = GetBufferDescriptor(*pa),
+ *b = GetBufferDescriptor(*pb);
+
+ /* tag: rnode, forkNum (different files), blockNum
+ * rnode: { spcNode (ignore: not really needed),
+ * dbNode (ignore: this is a directory), relNode }
+ * spcNode: table space oid, not that there are at least two
+ * (pg_global and pg_default).
+ */
+ /* compare relation */
+ if (a->tag.rnode.relNode < b->tag.rnode.relNode)
+ return -1;
+ else if (a->tag.rnode.relNode > b->tag.rnode.relNode)
+ return 1;
+ /* same relation, compare fork */
+ else if (a->tag.forkNum < b->tag.forkNum)
+ return -1;
+ else if (a->tag.forkNum > b->tag.forkNum)
+ return 1;
+ /* same relation/fork, so same segmented "file", compare block number
+ * which are mapped on different segments depending on the number.
+ */
+ else if (a->tag.blockNum < b->tag.blockNum)
+ return -1;
+ else /* should not be the same block anyway... */
+ return 1;
+}
+
+static void AllocateCheckpointBufferIds(void)
+{
+ /* Safe worst case allocation, all buffers belong to the checkpoint...
+ * that is pretty unlikely.
+ */
+ CheckpointBufferIds = (int *) palloc(sizeof(int) * NBuffers);
+}
+
+/* Status of buffers to checkpoint for a particular tablespace,
+ * used internally in BufferSync.
+ * - spcNone: oid of the tablespace
+ * - num_to_write: number of pages counted for this tablespace
+ * - num_written: number of pages actually written out
+ * - index: scanning position in CheckpointBufferIds for this tablespace
+ * - done: whether it is done
+ */
+typedef struct TableSpaceCheckpointStatus {
+ Oid space;
+ int num_to_write;
+ int num_written;
+ int index;
+ bool done;
+} TableSpaceCheckpointStatus;
+
+/* entry structure for table space to count hashtable,
+ * used internally in BufferSync.
+ */
+typedef struct TableSpaceCountEntry {
+ Oid space;
+ int count;
+} TableSpaceCountEntry;
+
/*
* BufferSync -- Write out all dirty buffers in the pool.
*
@@ -1575,10 +1650,21 @@ static void
BufferSync(int flags)
{
int buf_id;
- int num_to_scan;
int num_to_write;
int num_written;
+ int i;
int mask = BM_DIRTY;
+ HTAB *spcBuffers;
+ TableSpaceCheckpointStatus *spcStatus = NULL;
+ int nb_spaces, active_spaces, space;
+ FileFlushContext * spcContext = NULL;
+
+ /*
+ * Lazy allocation: this function is called through the checkpointer,
+ * but also by initdb. Maybe the allocation could be moved to the callers.
+ */
+ if (CheckpointBufferIds == NULL)
+ AllocateCheckpointBufferIds();
/* Make sure we can handle the pin inside SyncOneBuffer */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
@@ -1609,6 +1695,18 @@ BufferSync(int flags)
* certainly need to be written for the next checkpoint attempt, too.
*/
num_to_write = 0;
+
+ /* initialize oid -> int buffer count hash table */
+ {
+ HASHCTL ctl;
+
+ MemSet(&ctl, 0, sizeof(HASHCTL));
+ ctl.keysize = sizeof(Oid);
+ ctl.entrysize = sizeof(TableSpaceCountEntry);
+ spcBuffers = hash_create("Number of buffers to write per tablespace",
+ 16, &ctl, HASH_ELEM | HASH_BLOBS);
+ }
+
for (buf_id = 0; buf_id < NBuffers; buf_id++)
{
volatile BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
@@ -1621,32 +1719,185 @@ BufferSync(int flags)
if ((bufHdr->flags & mask) == mask)
{
+ Oid spc;
+ TableSpaceCountEntry * entry;
+ bool found;
+
bufHdr->flags |= BM_CHECKPOINT_NEEDED;
+ CheckpointBufferIds[num_to_write] = buf_id;
num_to_write++;
+
+ /* keep track of per tablespace buffers */
+ spc = bufHdr->tag.rnode.spcNode;
+ entry = (TableSpaceCountEntry *)
+ hash_search(spcBuffers, (void *) &spc, HASH_ENTER, &found);
+
+ if (found) entry->count++;
+ else entry->count = 1;
}
UnlockBufHdr(bufHdr);
}
if (num_to_write == 0)
+ {
+ hash_destroy(spcBuffers);
return; /* nothing to do */
+ }
TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
+ /* Build checkpoint tablespace buffer status & flush context arrays */
+ nb_spaces = hash_get_num_entries(spcBuffers);
+ spcStatus = (TableSpaceCheckpointStatus *)
+ palloc(sizeof(TableSpaceCheckpointStatus) * nb_spaces);
+ spcContext = (FileFlushContext *)
+ palloc(sizeof(FileFlushContext) * nb_spaces);
+
+ {
+ int index = 0;
+ HASH_SEQ_STATUS hseq;
+ TableSpaceCountEntry * entry;
+
+ hash_seq_init(&hseq, spcBuffers);
+ while ((entry = (TableSpaceCountEntry *) hash_seq_search(&hseq)))
+ {
+ Assert(index < nb_spaces);
+ spcStatus[index].space = entry->space;
+ spcStatus[index].num_to_write = entry->count;
+ spcStatus[index].num_written = 0;
+ /* should it be randomized? chosen with some criterion? */
+ spcStatus[index].index = 0;
+ spcStatus[index].done = false;
+
+#if defined(HAVE_SYNC_FILE_RANGE) || defined(HAVE_POSIX_FADVISE)
+
+ ResetFileFlushContext(&spcContext[index]);
+
+#endif /* HAVE_SYNC_FILE_RANGE || HAVE_POSIX_FADVISE */
+
+ index ++;
+ }
+ }
+
+ hash_destroy(spcBuffers);
+ spcBuffers = NULL;
+
/*
- * Loop over all buffers again, and write the ones (still) marked with
- * BM_CHECKPOINT_NEEDED. In this loop, we start at the clock sweep point
- * since we might as well dump soon-to-be-recycled buffers first.
+ * Sort buffer ids by chunks to help find sequential writes.
+ * Note: buffers are not locked in anyway, but that does not matter,
+ * this sorting is really advisory, if some buffer changes status during
+ * this pass it will be filtered out later. The only necessary property
+ * is that marked buffers do not move elsewhere. Also, qsort implementation
+ * should be resilient to occasional contradictions (cmp(a,b) != -cmp(b,a))
+ * because of these possible concurrent changes.
+ */
+ if (checkpoint_sort_size > 1)
+ {
+ /* debug...
+ ereport(WARNING,
+ (errcode(ERRCODE_WARNING),
+ errmsg("Checkpoint: sorting %d buffers (%d chunks, size=%d)",
+ num_to_write,
+ (checkpoint_sort_size+num_to_write-1) /
+ checkpoint_sort_size,
+ checkpoint_sort_size)));
+ */
+
+ for (i = 0; i < num_to_write; i += checkpoint_sort_size)
+ qsort(CheckpointBufferIds + i,
+ (i + checkpoint_sort_size <= num_to_write ?
+ checkpoint_sort_size : num_to_write - i),
+ sizeof(int),
+ (int(*)(const void *, const void *)) bufcmp);
+ }
+
+ /* debug
+ ereport(WARNING,
+ (errcode(ERRCODE_WARNING),
+ errmsg("Checkpoint: running on %d tablespaces",
+ nb_spaces)));
+ */
+
+ /*
+ * Loop over buffers to write through CheckpointBufferIds,
+ * and write the ones (still) marked with BM_CHECKPOINT_NEEDED,
+ * with some round robin over table spaces so as to balance writes,
+ * so that buffer writes move forward roughly proportionally for each
+ * tablespace.
*
- * Note that we don't read the buffer alloc count here --- that should be
- * left untouched till the next BgBufferSync() call.
+ * Termination: if a tablespace is selected by the inner while loop
+ * (see argument there), its index is incremented and will eventually
+ * reach num_to_write, mark this table space scanning as done and
+ * decrement the number of active spaces, which will thus reach 0.
*/
- buf_id = StrategySyncStart(NULL, NULL);
- num_to_scan = NBuffers;
+ active_spaces = nb_spaces;
+ space = 0;
num_written = 0;
- while (num_to_scan-- > 0)
+
+ while (active_spaces != 0)
{
- volatile BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+ volatile BufferDesc *bufHdr = NULL;
+ int index;
+
+ /*
+ * Select a tablespace depending on the current overall progress.
+ *
+ * The progress ratio of each unfinished tablespace is compared to
+ * the overall progress ratio to find one with is not in advance
+ * (i.e. tablespace ratio <= overall ratio).
+ *
+ * Existence: it is bound to exist otherwise the overall progress
+ * ratio would be inconsistent: with positive buffers to write (t1 & t2)
+ * and already written buffers (w1 & w2), we have:
+ *
+ * If w1/t1 > (w1+w2)/(t1+t2) # one table space is in advance
+ * => w1t1+w1t2 > w1t1+w2t1 => w1t2 > w2t1 => w1t2+w2t2 > w2t1+w2t2
+ * => (w1+w2) / (t1+t2) > w2 / t2 # the other one is late
+ *
+ * The round robin ensures that each space is given some attention
+ * till it is over the current ratio, before going to the next.
+ *
+ * Precision: using int32 computations for comparing fractions
+ * (w1 / t1 > w / t <=> w1 t > w t1) seems a bad idea as the values
+ * can overflow 32-bit integers: the limit would be sqrt(2**31) ~
+ * 46340 buffers, i.e. a 362 MB checkpoint. So ensure that 64-bit
+ * integers are used in the comparison.
+ */
+ while (spcStatus[space].done ||
+ /* compare tablespace vs overall progress ratio:
+ * tablespace written/to_write > overall written/to_write
+ */
+ (int64) spcStatus[space].num_written * num_to_write >
+ (int64) num_written * spcStatus[space].num_to_write)
+ space = (space + 1) % nb_spaces; /* round robin */
+
+ /*
+ * Find a valid buffer in the selected tablespace,
+ * by continuing the tablespace specific buffer scan
+ * where it was left.
+ */
+ index = spcStatus[space].index;
+
+ while (index < num_to_write && bufHdr == NULL)
+ {
+ buf_id = CheckpointBufferIds[index];
+ bufHdr = GetBufferDescriptor(buf_id);
+
+ /* Skip if in another tablespace or not in checkpoint anymore.
+ * No lock is acquired, see comments below.
+ */
+ if (spcStatus[space].space != bufHdr->tag.rnode.spcNode ||
+ ! (bufHdr->flags & BM_CHECKPOINT_NEEDED))
+ {
+ index ++;
+ buf_id = -1;
+ bufHdr = NULL;
+ }
+ }
+
+ /* Update tablespace writing status, will start over at next index */
+ spcStatus[space].index = index+1;
/*
* We don't need to acquire the lock here, because we're only looking
@@ -1660,39 +1911,49 @@ BufferSync(int flags)
* write the buffer though we didn't need to. It doesn't seem worth
* guarding against this, though.
*/
- if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
+ if (bufHdr != NULL && bufHdr->flags & BM_CHECKPOINT_NEEDED)
{
- if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
+ if (SyncOneBuffer(buf_id, false, checkpoint_flush_to_disk,
+ &spcContext[space]) & BUF_WRITTEN)
{
TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
BgWriterStats.m_buf_written_checkpoints++;
+ spcStatus[space].num_written++;
num_written++;
/*
- * We know there are at most num_to_write buffers with
- * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
- * num_written reaches num_to_write.
- *
- * Note that num_written doesn't include buffers written by
- * other backends, or by the bgwriter cleaning scan. That
- * means that the estimate of how much progress we've made is
- * conservative, and also that this test will often fail to
- * trigger. But it seems worth making anyway.
- */
- if (num_written >= num_to_write)
- break;
-
- /*
* Sleep to throttle our I/O rate.
*/
- CheckpointWriteDelay(flags, (double) num_written / num_to_write);
+ CheckpointWriteDelay(flags, (double) num_written / num_to_write,
+ spcContext, nb_spaces);
}
}
- if (++buf_id >= NBuffers)
- buf_id = 0;
+ /*
+ * Detect checkpoint end for a tablespace: either the scan is done
+ * or all tablespace buffers have been written out.
+ */
+ if (spcStatus[space].index >= num_to_write ||
+ spcStatus[space].num_written >= spcStatus[space].num_to_write)
+ {
+
+#if defined(HAVE_SYNC_FILE_RANGE) || defined(HAVE_POSIX_FADVISE)
+
+ PerformFileFlush(&spcContext[space]);
+ ResetFileFlushContext(&spcContext[space]);
+
+#endif /* HAVE_SYNC_FILE_RANGE || HAVE_POSIX_FADVISE */
+
+ spcStatus[space].done = true;
+ active_spaces--;
+ }
}
+ pfree(spcStatus);
+ spcStatus = NULL;
+ pfree(spcContext);
+ spcContext = NULL;
+
/*
* Update checkpoint statistics. As noted above, this doesn't include
* buffers written by other backends or bgwriter scan.
@@ -1939,7 +2200,8 @@ BgBufferSync(void)
/* Execute the LRU scan */
while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
{
- int buffer_state = SyncOneBuffer(next_to_clean, true);
+ int buffer_state =
+ SyncOneBuffer(next_to_clean, true, false, NULL);
if (++next_to_clean >= NBuffers)
{
@@ -2016,7 +2278,8 @@ BgBufferSync(void)
* Note: caller must have done ResourceOwnerEnlargeBuffers.
*/
static int
-SyncOneBuffer(int buf_id, bool skip_recently_used)
+SyncOneBuffer(int buf_id, bool skip_recently_used, bool flush_to_disk,
+ FileFlushContext * context)
{
volatile BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
int result = 0;
@@ -2057,7 +2320,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
PinBuffer_Locked(bufHdr);
LWLockAcquire(bufHdr->content_lock, LW_SHARED);
- FlushBuffer(bufHdr, NULL);
+ FlushBuffer(bufHdr, NULL, flush_to_disk, context);
LWLockRelease(bufHdr->content_lock);
UnpinBuffer(bufHdr, true);
@@ -2319,9 +2582,16 @@ BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
*
* If the caller has an smgr reference for the buffer's relation, pass it
* as the second parameter. If not, pass NULL.
+ *
+ * The third parameter tries to hint the OS that a high priority write is meant,
+ * possibly because io-throttling is already managed elsewhere.
+ * The last parameter holds the current flush context that accumulates flush
+ * requests to be performed in one call, instead of being performed on a buffer
+ * per buffer basis.
*/
static void
-FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
+FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln, bool flush_to_disk,
+ FileFlushContext * context)
{
XLogRecPtr recptr;
ErrorContextCallback errcallback;
@@ -2410,7 +2680,9 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
buf->tag.forkNum,
buf->tag.blockNum,
bufToWrite,
- false);
+ false,
+ flush_to_disk,
+ context);
if (track_io_timing)
{
@@ -2830,7 +3102,9 @@ FlushRelationBuffers(Relation rel)
bufHdr->tag.forkNum,
bufHdr->tag.blockNum,
localpage,
- false);
+ false,
+ false,
+ NULL);
bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
@@ -2864,7 +3138,7 @@ FlushRelationBuffers(Relation rel)
{
PinBuffer_Locked(bufHdr);
LWLockAcquire(bufHdr->content_lock, LW_SHARED);
- FlushBuffer(bufHdr, rel->rd_smgr);
+ FlushBuffer(bufHdr, rel->rd_smgr, false, NULL);
LWLockRelease(bufHdr->content_lock);
UnpinBuffer(bufHdr, true);
}
@@ -2916,7 +3190,7 @@ FlushDatabaseBuffers(Oid dbid)
{
PinBuffer_Locked(bufHdr);
LWLockAcquire(bufHdr->content_lock, LW_SHARED);
- FlushBuffer(bufHdr, NULL);
+ FlushBuffer(bufHdr, NULL, false, NULL);
LWLockRelease(bufHdr->content_lock);
UnpinBuffer(bufHdr, true);
}
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 3144afe..114a0a6 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -208,7 +208,9 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
bufHdr->tag.forkNum,
bufHdr->tag.blockNum,
localpage,
- false);
+ false,
+ false,
+ NULL);
/* Mark not-dirty now in case we error out below */
bufHdr->flags &= ~BM_DIRTY;
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index ea4d689..fb3b383 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -317,7 +317,7 @@ BufFileDumpBuffer(BufFile *file)
return; /* seek failed, give up */
file->offsets[file->curFile] = file->curOffset;
}
- bytestowrite = FileWrite(thisfile, file->buffer + wpos, bytestowrite);
+ bytestowrite = FileWrite(thisfile, file->buffer + wpos, bytestowrite, false, NULL);
if (bytestowrite <= 0)
return; /* failed to write */
file->offsets[file->curFile] += bytestowrite;
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 1ba4946..daf03e4 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -1344,8 +1344,97 @@ retry:
return returnCode;
}
+#if defined(HAVE_SYNC_FILE_RANGE) || defined(HAVE_POSIX_FADVISE)
+
+void
+ResetFileFlushContext(FileFlushContext * context)
+{
+ context->fd = 0;
+ context->ncalls = 0;
+ context->offset = 0;
+ context->nbytes = 0;
+ context->filename = NULL;
+}
+
+void
+PerformFileFlush(FileFlushContext * context)
+{
+ if (context->ncalls != 0)
+ {
+ int rc;
+
+#if defined(HAVE_SYNC_FILE_RANGE)
+
+ /* Linux: tell the memory manager to move these blocks to io so
+ * that they are considered for being actually written to disk.
+ */
+ rc = sync_file_range(context->fd, context->offset, context->nbytes,
+ SYNC_FILE_RANGE_WRITE);
+
+#elif defined(HAVE_POSIX_FADVISE)
+
+ /* Others: say that data should not be kept in memory...
+ * This is not exactly what we want to say, because we want to write
+ * the data for durability but we may need it later nevertheless.
+ * It seems that Linux would free the memory *if* the data has
+ * already been written do disk, else the "dontneed" call is ignored.
+ * For FreeBSD this may have the desired effect of moving the
+ * data to the io layer, although the system does not seem to
+ * take into account the provided offset & size, so it is rather
+ * rough...
+ */
+ rc = posix_fadvise(context->fd, context->offset, context->nbytes,
+ POSIX_FADV_DONTNEED);
+
+#endif
+
+ if (rc < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not flush block " INT64_FORMAT
+ " on " INT64_FORMAT " blocks in file \"%s\": %m",
+ context->offset / BLCKSZ,
+ context->nbytes / BLCKSZ,
+ context->filename)));
+ }
+}
+
+void
+FileAsynchronousFlush(FileFlushContext * context,
+ int fd, off_t offset, off_t nbytes, char * filename)
+{
+ if (context->ncalls != 0 && context->fd == fd)
+ {
+ /* Same file: merge current flush with previous ones */
+ off_t new_offset = offset < context->offset? offset: context->offset;
+
+ context->nbytes =
+ (context->offset + context->nbytes > offset + nbytes ?
+ context->offset + context->nbytes : offset + nbytes) -
+ new_offset;
+ context->offset = new_offset;
+ context->ncalls ++;
+ }
+ else
+ {
+ /* file has changed; actually flush previous file before restarting
+ * to accumulate flushes
+ */
+ PerformFileFlush(context);
+
+ context->fd = fd;
+ context->ncalls = 1;
+ context->offset = offset;
+ context->nbytes = nbytes;
+ context->filename = filename;
+ }
+}
+
+#endif /* HAVE_SYNC_FILE_RANGE || HAVE_POSIX_FADVISE */
+
int
-FileWrite(File file, char *buffer, int amount)
+FileWrite(File file, char *buffer, int amount, bool flush_to_disk,
+ FileFlushContext * context)
{
int returnCode;
@@ -1395,6 +1484,28 @@ retry:
if (returnCode >= 0)
{
+
+#if defined(HAVE_SYNC_FILE_RANGE) || defined(HAVE_POSIX_FADVISE)
+
+ /*
+ * Calling "write" tells the OS that pg wants to write some page to disk,
+ * however when it is really done is chosen by the OS.
+ * Depending on other disk activities this may be delayed significantly,
+ * maybe up to an "fsync" call, which could induce an IO write surge.
+ * When checkpointing pg is doing its own throttling and the result
+ * should really be written to disk with high priority, so as to meet
+ * the completion target.
+ * This call hints that such write have a higher priority.
+ */
+ if (flush_to_disk && returnCode == amount && errno == 0)
+ {
+ FileAsynchronousFlush(context,
+ VfdCache[file].fd, VfdCache[file].seekPos,
+ amount, VfdCache[file].fileName);
+ }
+
+#endif /* HAVE_SYNC_FILE_RANGE || HAVE_POSIX_FADVISE */
+
VfdCache[file].seekPos += returnCode;
/* maintain fileSize and temporary_files_size if it's a temp file */
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 42a43bb..dbf057f 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -531,7 +531,7 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
errmsg("could not seek to block %u in file \"%s\": %m",
blocknum, FilePathName(v->mdfd_vfd))));
- if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ)) != BLCKSZ)
+ if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, false, NULL)) != BLCKSZ)
{
if (nbytes < 0)
ereport(ERROR,
@@ -738,7 +738,8 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
*/
void
mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
- char *buffer, bool skipFsync)
+ char *buffer, bool skipFsync, bool flush_to_disk,
+ FileFlushContext * context)
{
off_t seekpos;
int nbytes;
@@ -767,7 +768,7 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
errmsg("could not seek to block %u in file \"%s\": %m",
blocknum, FilePathName(v->mdfd_vfd))));
- nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ);
+ nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, flush_to_disk, context);
TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum,
reln->smgr_rnode.node.spcNode,
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 244b4ea..2db3cd3 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -52,7 +52,8 @@ typedef struct f_smgr
void (*smgr_read) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer);
void (*smgr_write) (SMgrRelation reln, ForkNumber forknum,
- BlockNumber blocknum, char *buffer, bool skipFsync);
+ BlockNumber blocknum, char *buffer, bool skipFsync,
+ bool flush_to_disk, FileFlushContext *context);
BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks);
@@ -643,10 +644,11 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
*/
void
smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
- char *buffer, bool skipFsync)
+ char *buffer, bool skipFsync, bool flush_to_disk,
+ FileFlushContext * context)
{
(*(smgrsw[reln->smgr_which].smgr_write)) (reln, forknum, blocknum,
- buffer, skipFsync);
+ buffer, skipFsync, flush_to_disk, context);
}
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 230c5cc..d71cef7 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -158,6 +158,7 @@ static bool check_bonjour(bool *newval, void **extra, GucSource source);
static bool check_ssl(bool *newval, void **extra, GucSource source);
static bool check_stage_log_stats(bool *newval, void **extra, GucSource source);
static bool check_log_stats(bool *newval, void **extra, GucSource source);
+static bool check_flush_to_disk(bool *newval, void **extra, GucSource source);
static bool check_canonical_path(char **newval, void **extra, GucSource source);
static bool check_timezone_abbreviations(char **newval, void **extra, GucSource source);
static void assign_timezone_abbreviations(const char *newval, void *extra);
@@ -1009,6 +1010,17 @@ static struct config_bool ConfigureNamesBool[] =
false,
NULL, NULL, NULL
},
+
+ {
+ {"checkpoint_flush_to_disk", PGC_SIGHUP, WAL_CHECKPOINTS,
+ gettext_noop("Hint that checkpoint's writes are high priority."),
+ NULL
+ },
+ &checkpoint_flush_to_disk,
+ false,
+ check_flush_to_disk, NULL, NULL
+ },
+
{
{"log_connections", PGC_SU_BACKEND, LOGGING_WHAT,
gettext_noop("Logs each successful connection."),
@@ -2205,6 +2217,16 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"checkpoint_sort_size", PGC_SIGHUP, WAL_CHECKPOINTS,
+ gettext_noop("Set the number of disk-page buffers sorted together on checkpoints."),
+ NULL
+ },
+ &checkpoint_sort_size,
+ 128*1024, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"wal_buffers", PGC_POSTMASTER, WAL_SETTINGS,
gettext_noop("Sets the number of disk-page buffers in shared memory for WAL."),
NULL,
@@ -9760,6 +9782,21 @@ check_log_stats(bool *newval, void **extra, GucSource source)
}
static bool
+check_flush_to_disk(bool *newval, void **extra, GucSource source)
+{
+/* This test must be consistent with the one in FileWrite (storage/file/fd.c)
+ */
+#if ! (defined(HAVE_SYNC_FILE_RANGE) || defined(HAVE_POSIX_FADVISE))
+ /* just warn if it has no effect */
+ ereport(WARNING,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("Setting \"checkpoint_flush_to_disk\" has no effect "
+ "on this platform.")));
+#endif /* HAVE_SYNC_FILE_RANGE */
+ return true;
+}
+
+static bool
check_canonical_path(char **newval, void **extra, GucSource source)
{
/*
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 06dfc06..630100d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -202,6 +202,8 @@
#max_wal_size = 1GB
#min_wal_size = 80MB
#checkpoint_completion_target = 0.5 # checkpoint target duration, 0.0 - 1.0
+#checkpoint_sort_size = 131072 # sort checkpoint buffers by chunks; 0 disables
+#checkpoint_flush_to_disk = off # send checkpoint buffers to disk
#checkpoint_warning = 30s # 0 disables
# - Archiving -
diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h
index a49c208..f9c8ca1 100644
--- a/src/include/postmaster/bgwriter.h
+++ b/src/include/postmaster/bgwriter.h
@@ -16,6 +16,7 @@
#define _BGWRITER_H
#include "storage/block.h"
+#include "storage/fd.h"
#include "storage/relfilenode.h"
@@ -29,7 +30,8 @@ extern void BackgroundWriterMain(void) pg_attribute_noreturn();
extern void CheckpointerMain(void) pg_attribute_noreturn();
extern void RequestCheckpoint(int flags);
-extern void CheckpointWriteDelay(int flags, double progress);
+extern void CheckpointWriteDelay(int flags, double progress,
+ FileFlushContext * context, int ctx_size);
extern bool ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum,
BlockNumber segno);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ec0a254..0534155 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -54,6 +54,8 @@ extern int bgwriter_lru_maxpages;
extern double bgwriter_lru_multiplier;
extern bool track_io_timing;
extern int target_prefetch_pages;
+extern bool checkpoint_flush_to_disk;
+extern int checkpoint_sort_size;
/* in buf_init.c */
extern PGDLLIMPORT char *BufferBlocks;
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 7eabe09..c740ee7 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -59,6 +59,22 @@ extern int max_files_per_process;
*/
extern int max_safe_fds;
+/* FileFlushContext:
+ * This structure is used to accumulate several flush requests on a file
+ * into a larger flush request.
+ * - fd: file descriptor of the file
+ * - ncalls: number of flushes merged together
+ * - offset: starting offset (minimum of all offset)
+ * - nbytes: size (minimum extent to cover all flushed data)
+ * - filename: filename of fd for error messages
+ */
+typedef struct FileFlushContext{
+ int fd;
+ int ncalls;
+ off_t offset;
+ off_t nbytes;
+ char * filename;
+} FileFlushContext;
/*
* prototypes for functions in fd.c
@@ -70,7 +86,12 @@ extern File OpenTemporaryFile(bool interXact);
extern void FileClose(File file);
extern int FilePrefetch(File file, off_t offset, int amount);
extern int FileRead(File file, char *buffer, int amount);
-extern int FileWrite(File file, char *buffer, int amount);
+extern void ResetFileFlushContext(FileFlushContext * context);
+extern void PerformFileFlush(FileFlushContext * context);
+extern void FileAsynchronousFlush(FileFlushContext * context,
+ int fd, off_t offset, off_t nbytes, char * filename);
+extern int FileWrite(File file, char *buffer, int amount, bool flush_to_disk,
+ FileFlushContext * context);
extern int FileSync(File file);
extern off_t FileSeek(File file, off_t offset, int whence);
extern int FileTruncate(File file, off_t offset);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 69a624f..a46a70c 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -16,6 +16,7 @@
#include "fmgr.h"
#include "storage/block.h"
+#include "storage/fd.h"
#include "storage/relfilenode.h"
@@ -95,7 +96,8 @@ extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum,
extern void smgrread(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer);
extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
- BlockNumber blocknum, char *buffer, bool skipFsync);
+ BlockNumber blocknum, char *buffer, bool skipFsync,
+ bool flush_to_disk, FileFlushContext * context);
extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks);
@@ -120,8 +122,9 @@ extern void mdprefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);
-extern void mdwrite(SMgrRelation reln, ForkNumber forknum,
- BlockNumber blocknum, char *buffer, bool skipFsync);
+extern void mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+ char *buffer, bool skipFsync, bool flush_to_disk,
+ FileFlushContext * context);
extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks);