From 6abc059aaddcdc0e368a9e2c8dec49a09819cbef Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Wed, 18 Jul 2018 11:32:55 -0400 Subject: [PATCH] replslot advance comment updates --- src/backend/replication/logical/logical.c | 5 ++- src/backend/replication/slotfuncs.c | 75 ++++++++++++++++++++----------- 2 files changed, 52 insertions(+), 28 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 61588d626f..c9bbdcda74 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin, * that, see below). * * output_plugin_options - * contains options passed to the output plugin. + * options passed to the output plugin. + * + * fast_forward + * bypass the generation of logical changes. * * read_page, prepare_write, do_write, update_progress * callbacks that have to be filled to perform the use-case dependent, diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 23af32355b..08d20a9470 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -318,9 +318,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) /* * Helper function for advancing physical replication slot forward. - * The LSN position to move to is compared simply to the slot's - * restart_lsn, knowing that any position older than that would be - * removed by successive checkpoints. + * + * The LSN position to move to is compared simply to the slot's restart_lsn, + * knowing that any position older than that would be removed by successive + * checkpoints. */ static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto) @@ -341,67 +342,87 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) /* * Helper function for advancing logical replication slot forward. + * * The slot's restart_lsn is used as start point for reading records, * while confirmed_lsn is used as base point for the decoding context. - * The LSN position to move to is checked by doing a per-record scan and - * logical decoding which makes sure that confirmed_lsn is updated to a - * LSN which allows the future slot consumer to get consistent logical - * changes. + * + * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush, + * because we need to digest WAL to advance restart_lsn allowing to recycle + * WAL and removal of old catalog tuples. As decoding is done in fast_forward + * mode, no changes are generated anyway. */ static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto) { LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; - XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; - XLogRecPtr retlsn = MyReplicationSlot->data.confirmed_flush; + XLogRecPtr startlsn; + XLogRecPtr retlsn; PG_TRY(); { - /* restart at slot's confirmed_flush */ + /* + * Create our decoding context in fast_forward mode, passing start_lsn + * as Invalid, so that we start processing from confirmed_flush. + */ ctx = CreateDecodingContext(InvalidXLogRecPtr, NIL, - true, + true, /* fast_forward */ logical_read_local_xlog_page, NULL, NULL, NULL); - CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, - "logical decoding"); + /* + * Start reading at the slot's restart_lsn, which we know to point to + * a valid record. + */ + startlsn = MyReplicationSlot->data.restart_lsn; + + /* Initialize our return value in case we don't do anything */ + retlsn = MyReplicationSlot->data.confirmed_flush; /* invalidate non-timetravel entries */ InvalidateSystemCaches(); - /* Decode until we run out of records */ - while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) || - (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto)) + /* Decode at least one record, until we run out of records */ + while ((!XLogRecPtrIsInvalid(startlsn) && + startlsn < moveto) || + (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) && + ctx->reader->EndRecPtr < moveto)) { - XLogRecord *record; char *errm = NULL; + bool gotrecord; - record = XLogReadRecord(ctx->reader, startlsn, &errm); + /* + * Read records. No changes are generated in fast_forward mode, + * but snapbuilder/slot statuses are updated properly. + */ + gotrecord = XLogReadRecord(ctx->reader, startlsn, &errm) != NULL; if (errm) elog(ERROR, "%s", errm); - /* - * Now that we've set up the xlog reader state, subsequent calls - * pass InvalidXLogRecPtr to say "continue from last record" - */ + /* Read sequentially from now on */ startlsn = InvalidXLogRecPtr; /* - * The {begin_txn,change,commit_txn}_wrapper callbacks above will - * store the description into our tuplestore. + * Process the record. Storage-level changes are ignored in + * fast_forward mode, but other modules (such as snapbuilder) + * might still have critical updates to do. */ - if (record != NULL) + if (gotrecord) LogicalDecodingProcessRecord(ctx, ctx->reader); - /* Stop once the moving point wanted by caller has been reached */ + /* Stop once the requested target has been reached */ if (moveto <= ctx->reader->EndRecPtr) break; CHECK_FOR_INTERRUPTS(); } + /* + * Logical decoding could have clobbered CurrentResourceOwner during + * transaction management, so restore the executor's value. (This is + * a kluge, but it's not worth cleaning up right now.) + */ CurrentResourceOwner = old_resowner; if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) @@ -409,7 +430,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) LogicalConfirmReceivedLocation(moveto); /* - * If only the confirmed_flush_lsn has changed the slot won't get + * If only the confirmed_flush LSN has changed the slot won't get * marked as dirty by the above. Callers on the walsender * interface are expected to keep track of their own progress and * don't need it written out. But SQL-interface users cannot -- 2.11.0