From 1955cb9f68f9c027c566c4909d7ead475cf20f3b Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 8 Aug 2016 16:43:55 -0700 Subject: [PATCH 2/2] Block level parallel Vacuum. --- src/backend/access/nbtree/nbtutils.c | 19 --- src/backend/commands/vacuum.c | 1 + src/backend/commands/vacuumlazy.c | 239 ++++++++++++++++++++++++++++------- src/backend/utils/misc/guc.c | 10 ++ src/include/commands/vacuum.h | 41 +++++- src/include/storage/buf_internals.h | 1 + 6 files changed, 245 insertions(+), 66 deletions(-) diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 5d335c7..987aceb 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -1918,25 +1918,6 @@ _bt_start_vacuum(Relation rel) if (result == 0 || result > MAX_BT_CYCLE_ID) result = btvacinfo->cycle_ctr = 1; - /* Let's just make sure there's no entry already for this index */ - for (i = 0; i < btvacinfo->num_vacuums; i++) - { - vac = &btvacinfo->vacuums[i]; - if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && - vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) - { - /* - * Unlike most places in the backend, we have to explicitly - * release our LWLock before throwing an error. This is because - * we expect _bt_end_vacuum() to be called before transaction - * abort cleanup can run to release LWLocks. - */ - LWLockRelease(BtreeVacuumLock); - elog(ERROR, "multiple active vacuums for index \"%s\"", - RelationGetRelationName(rel)); - } - } - /* OK, add an entry */ if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums) { diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 0563e63..1562773 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -58,6 +58,7 @@ int vacuum_freeze_min_age; int vacuum_freeze_table_age; int vacuum_multixact_freeze_min_age; int vacuum_multixact_freeze_table_age; +int parallel_vacuum_workers; /* A few variables that don't seem worth passing around as parameters */ diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c index 231e92d..4fc880d 100644 --- a/src/backend/commands/vacuumlazy.c +++ b/src/backend/commands/vacuumlazy.c @@ -42,8 +42,10 @@ #include "access/heapam_xlog.h" #include "access/htup_details.h" #include "access/multixact.h" +#include "access/parallel.h" #include "access/transam.h" #include "access/visibilitymap.h" +#include "access/xact.h" #include "access/xlog.h" #include "catalog/catalog.h" #include "catalog/storage.h" @@ -98,33 +100,9 @@ */ #define SKIP_PAGES_THRESHOLD ((BlockNumber) 32) -typedef struct LVRelStats -{ - /* hasindex = true means two-pass strategy; false means one-pass */ - bool hasindex; - /* Overall statistics about rel */ - BlockNumber old_rel_pages; /* previous value of pg_class.relpages */ - BlockNumber rel_pages; /* total number of pages */ - BlockNumber scanned_pages; /* number of pages we examined */ - BlockNumber pinskipped_pages; /* # of pages we skipped due to a pin */ - BlockNumber frozenskipped_pages; /* # of frozen pages we skipped */ - double scanned_tuples; /* counts only tuples on scanned pages */ - double old_rel_tuples; /* previous value of pg_class.reltuples */ - double new_rel_tuples; /* new estimated total # of tuples */ - double new_dead_tuples; /* new estimated total # of dead tuples */ - BlockNumber pages_removed; - double tuples_deleted; - BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ - /* List of TIDs of tuples we intend to delete */ - /* NB: this list is ordered by TID address */ - int num_dead_tuples; /* current # of entries */ - int max_dead_tuples; /* # slots allocated in array */ - ItemPointer dead_tuples; /* array of ItemPointerData */ - int num_index_scans; - TransactionId latestRemovedXid; - bool lock_waiter_detected; -} LVRelStats; - +/* DSM key for block-level parallel vacuum */ +#define VACUUM_KEY_TASK 50 +#define VACUUM_KEY_WORKER_STATS 51 /* A few variables that don't seem worth passing around as parameters */ static int elevel = -1; @@ -137,9 +115,6 @@ static BufferAccessStrategy vac_strategy; /* non-export function prototypes */ -static void lazy_scan_heap(Relation onerel, int options, - LVRelStats *vacrelstats, Relation *Irel, int nindexes, - bool aggressive); static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats); static bool lazy_check_needs_freeze(Buffer buf, bool *hastup); static void lazy_vacuum_index(Relation indrel, @@ -162,6 +137,18 @@ static int vac_cmp_itemptr(const void *left, const void *right); static bool heap_page_is_all_visible(Relation rel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); +/* functions for parallel vacuum */ +static void parallel_lazy_scan_heap(Relation rel, LVRelStats *vacrelstats, + Relation *Irel, int nindexes, int options, + bool aggressive, int wnum); +static void vacuum_worker(dsm_segment *seg, shm_toc *toc); +static void lazy_scan_heap(Relation onerel, int options, + LVRelStats *vacrelstats, Relation *Irel, + int nindexes, bool aggressive, + BlockNumber begin, BlockNumber nblocks); +static void gather_vacuum_stats(LVRelStats *valrelstats, LVRelStats *worker_stats, + int wnum); + /* * lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation @@ -248,8 +235,17 @@ lazy_vacuum_rel(Relation onerel, int options, VacuumParams *params, vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel); vacrelstats->hasindex = (nindexes > 0); - /* Do the vacuuming */ - lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, aggressive); + /* Do the parallel vacuuming. */ + if (parallel_vacuum_workers > 1) + parallel_lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, options, + aggressive, parallel_vacuum_workers); + else + { + BlockNumber nblocks = RelationGetNumberOfBlocks(onerel); + + lazy_scan_heap(onerel, options, vacrelstats, Irel, nindexes, + aggressive, 0, nblocks); + } /* Done with indexes */ vac_close_indexes(nindexes, Irel, NoLock); @@ -428,7 +424,132 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) } /* - * lazy_scan_heap() -- scan an open heap relation + * Launch parallel vacuum workers specified by vacuum_parallel_workers and then + * gather the result stats of each workers. The idea of vacuuming one relation + * with multiple workers parallely is that each worker is assigned particlar block + * range of relation which is calculated using by parallel_vacuum_workers and + * the number of relation blocks. The informations and some threshoulds (e.g. + * OldestXmin, FreezeLimit, MultiXactCufoff) are stored into DSM tagged by + * VACUUM_KEY_TASK. Each worker can collect the garbage tid and reclaims them as + * well. Vacuum statistics for each workers are stored into DSm tagged by + * VACUUM_KEY_WORKER_STATS, that will be gathered by the leader process after all + * worker finished its task. + */ +static void +parallel_lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats, + Relation *Irel, int nindexes, int options, + bool aggressive, int wnum) +{ + ParallelContext *pcxt; + LVRelStats *wstats_space; + VacuumTask *task_space; + IndexBulkDeleteResult **indstats; + int size = 0; + int i; + + EnterParallelMode(); + + /* Create parallel context and initialize it */ + pcxt = CreateParallelContext(vacuum_worker, wnum); + size += BUFFERALIGN(sizeof(VacuumTask)); /* For task */ + size += BUFFERALIGN(sizeof(LVRelStats) * pcxt->nworkers); /* For worker stats */ + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 2); + InitializeParallelDSM(pcxt); + + /* Prepare for VacuumTask space */ + task_space = (VacuumTask *)shm_toc_allocate(pcxt->toc, sizeof(VacuumTask)); + shm_toc_insert(pcxt->toc, VACUUM_KEY_TASK, task_space); + task_space->relid = RelationGetRelid(onerel); + task_space->aggressive = aggressive; + task_space->options = options; + task_space->oldestxmin = OldestXmin; + task_space->freezelimit = FreezeLimit; + task_space->multixactcutoff = MultiXactCutoff; + task_space->wnum = wnum; + task_space->elevel = elevel; + + /* Prepare for worker LVRelStats space */ + wstats_space = (LVRelStats *)shm_toc_allocate(pcxt->toc, + sizeof(LVRelStats) * pcxt->nworkers); + shm_toc_insert(pcxt->toc, VACUUM_KEY_WORKER_STATS, wstats_space); + for (i = 0; i < pcxt->nworkers; i++) + { + LVRelStats *wstats = wstats_space + sizeof(LVRelStats) * i; + memcpy(wstats, vacrelstats, sizeof(LVRelStats)); + } + + /* Do parallel vacuum */ + LaunchParallelWorkers(pcxt); + + /* Wait for workers finising vacuuming */ + WaitForParallelWorkersToFinish(pcxt); + gather_vacuum_stats(vacrelstats, wstats_space, wnum); + + DestroyParallelContext(pcxt); + ExitParallelMode(); + + indstats = (IndexBulkDeleteResult **) + palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); + + /* Do post-vacuum cleanup and statistics update for each index */ + for (i = 0; i < nindexes; i++) + lazy_cleanup_index(Irel[i], indstats[i], vacrelstats); +} + +/* + * Entry function for parallel vacuum worker. Each worker calculates the + * starting block number and number of blocks need to process, and then + * does vacuuming particular block range of relation. + */ +static void +vacuum_worker(dsm_segment *seg, shm_toc *toc) +{ + VacuumTask *task; + LVRelStats *wstats_space; + LVRelStats *wstats; + Relation rel; + BlockNumber begin; + BlockNumber nblocks_per_worker; + BlockNumber nblocks; + int nindexes; + Relation *Irel; + + /* Set up task information */ + task = (VacuumTask *)shm_toc_lookup(toc, VACUUM_KEY_TASK); + OldestXmin = task->oldestxmin; + FreezeLimit = task->freezelimit; + MultiXactCutoff = task->multixactcutoff; + + /* Set up message queue */ + wstats_space = (LVRelStats *)shm_toc_lookup(toc, VACUUM_KEY_WORKER_STATS); + wstats = wstats_space + sizeof(LVRelStats) * ParallelWorkerNumber; + + /* Calculate how many blocks the worker should process */ + rel = heap_open(task->relid, NoLock); + vac_open_indexes(rel, RowExclusiveLock, &nindexes, &Irel); + nblocks_per_worker = RelationGetNumberOfBlocks(rel) / parallel_vacuum_workers; + begin = nblocks_per_worker * ParallelWorkerNumber; + + /* The last worker processes remaining blocks */ + if (ParallelWorkerNumber == (task->wnum - 1)) + nblocks = RelationGetNumberOfBlocks(rel) - begin; + else + nblocks = nblocks_per_worker; + + /* Set up elevel */ + elevel = task->elevel; + + /* Do vacuuming particular area */ + lazy_scan_heap(rel, task->options, wstats, Irel, nindexes, + task->aggressive, begin, nblocks); + + heap_close(rel, NoLock); + vac_close_indexes(nindexes, Irel, NoLock); +} + +/* + * lazy_scan_heap() -- scan paritclar range of open heap relation * * This routine prunes each page in the heap, which will among other * things truncate dead tuples to dead line pointers, defragment the @@ -445,10 +566,10 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) */ static void lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, - Relation *Irel, int nindexes, bool aggressive) + Relation *Irel, int nindexes, bool aggressive, + BlockNumber begin, BlockNumber nblocks) { - BlockNumber nblocks, - blkno; + BlockNumber blkno; HeapTupleData tuple; char *relname; BlockNumber empty_pages, @@ -471,14 +592,15 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, PROGRESS_VACUUM_MAX_DEAD_TUPLES }; int64 initprog_val[3]; + BlockNumber end = begin + nblocks; pg_rusage_init(&ru0); relname = RelationGetRelationName(onerel); ereport(elevel, - (errmsg("vacuuming \"%s.%s\"", + (errmsg("vacuuming \"%s.%s\", from block %u to %u, %u blocks", get_namespace_name(RelationGetNamespace(onerel)), - relname))); + relname, begin, end, nblocks))); empty_pages = vacuumed_pages = 0; num_tuples = tups_vacuumed = nkeep = nunused = 0; @@ -486,7 +608,6 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, indstats = (IndexBulkDeleteResult **) palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); - nblocks = RelationGetNumberOfBlocks(onerel); vacrelstats->rel_pages = nblocks; vacrelstats->scanned_pages = 0; vacrelstats->nonempty_pages = 0; @@ -545,10 +666,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, * the last page. This is worth avoiding mainly because such a lock must * be replayed on any hot standby, where it can be disruptive. */ - next_unskippable_block = 0; + next_unskippable_block = begin; if ((options & VACOPT_DISABLE_PAGE_SKIPPING) == 0) { - while (next_unskippable_block < nblocks) + while (next_unskippable_block < end) { uint8 vmstatus; @@ -574,7 +695,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, else skipping_blocks = false; - for (blkno = 0; blkno < nblocks; blkno++) + for (blkno = begin; blkno < end; blkno++) { Buffer buf; Page page; @@ -1306,10 +1427,6 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_PHASE_INDEX_CLEANUP); - /* Do post-vacuum cleanup and statistics update for each index */ - for (i = 0; i < nindexes; i++) - lazy_cleanup_index(Irel[i], indstats[i], vacrelstats); - /* If no indexes, make log report that lazy_vacuum_heap would've made */ if (vacuumed_pages) ereport(elevel, @@ -1317,6 +1434,13 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, RelationGetRelationName(onerel), tups_vacuumed, vacuumed_pages))); + /* Do post-vacuum cleanup and statistics update for each index */ + if (!IsParallelWorker()) + { + for (i = 0; i < nindexes; i++) + lazy_cleanup_index(Irel[i], indstats[i], vacrelstats); + } + /* * This is pretty messy, but we split it up so that we can skip emitting * individual parts of the message when not applicable. @@ -1347,6 +1471,29 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, pfree(buf.data); } +/* + * gather_vacuum_stats() -- Gather vacuum statistics from workers + */ +static void +gather_vacuum_stats(LVRelStats *vacrelstats, LVRelStats *worker_stats, int wnum) +{ + int i; + + /* Gather each worker stats */ + for (i = 0; i < wnum; i++) + { + LVRelStats *wstats = worker_stats + sizeof(LVRelStats) * i; + + vacrelstats->rel_pages += wstats->rel_pages; + vacrelstats->scanned_pages += wstats->scanned_pages; + vacrelstats->pinskipped_pages += wstats->pinskipped_pages; + vacrelstats->frozenskipped_pages += wstats->frozenskipped_pages; + vacrelstats->scanned_tuples += wstats->scanned_tuples; + vacrelstats->new_rel_tuples += wstats->new_rel_tuples; + vacrelstats->pages_removed += wstats->pages_removed; + vacrelstats->nonempty_pages += wstats->nonempty_pages; + } +} /* * lazy_vacuum_heap() -- second pass over the heap diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index c5178f7..0dd64bc 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2661,6 +2661,16 @@ static struct config_int ConfigureNamesInt[] = }, { + {"parallel_vacuum_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS, + gettext_noop("Sets the number of parallel worker for vacuum."), + NULL + }, + ¶llel_vacuum_workers, + 1, 1, 1024, + NULL, NULL, NULL + }, + + { {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM, gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."), NULL, diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 80cd4a8..fc46c09 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -147,6 +147,45 @@ typedef struct VacuumParams * activated, -1 to use default */ } VacuumParams; +typedef struct LVRelStats +{ + /* hasindex = true means two-pass strategy; false means one-pass */ + bool hasindex; + /* Overall statistics about rel */ + BlockNumber old_rel_pages; /* previous value of pg_class.relpages */ + BlockNumber rel_pages; /* total number of pages */ + BlockNumber scanned_pages; /* number of pages we examined */ + BlockNumber pinskipped_pages; /* # of pages we skipped due to a pin */ + BlockNumber frozenskipped_pages; /* # of frozen pages we skipped */ + double scanned_tuples; /* counts only tuples on scanned pages */ + double old_rel_tuples; /* previous value of pg_class.reltuples */ + double new_rel_tuples; /* new estimated total # of tuples */ + double new_dead_tuples; /* new estimated total # of dead tuples */ + BlockNumber pages_removed; + double tuples_deleted; + BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ + /* List of TIDs of tuples we intend to delete */ + /* NB: this list is ordered by TID address */ + int num_dead_tuples; /* current # of entries */ + int max_dead_tuples; /* # slots allocated in array */ + ItemPointer dead_tuples; /* array of ItemPointerData */ + int num_index_scans; + TransactionId latestRemovedXid; + bool lock_waiter_detected; +} LVRelStats; + +typedef struct VacuumTask +{ + Oid relid; /* Target relation oid */ + bool aggressive; /* does each worker need to aggressive vacuum? */ + int options; /* Specified vacuum options */ + TransactionId oldestxmin; + TransactionId freezelimit; + MultiXactId multixactcutoff; + int wnum; + int elevel; +} VacuumTask; + /* GUC parameters */ extern PGDLLIMPORT int default_statistics_target; /* PGDLLIMPORT for * PostGIS */ @@ -154,7 +193,7 @@ extern int vacuum_freeze_min_age; extern int vacuum_freeze_table_age; extern int vacuum_multixact_freeze_min_age; extern int vacuum_multixact_freeze_table_age; - +extern int parallel_vacuum_workers; /* in commands/vacuum.c */ extern void ExecVacuum(VacuumStmt *vacstmt, bool isTopLevel); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 90fcbd7..4f9d986 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -15,6 +15,7 @@ #ifndef BUFMGR_INTERNALS_H #define BUFMGR_INTERNALS_H +#include "lib/ilist.h" #include "storage/buf.h" #include "storage/bufmgr.h" #include "storage/latch.h" -- 2.8.1