From 163ae827097b5c2956fafa6c6884b58e3a53ae3b Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Tue, 21 Feb 2017 20:14:44 +0100 Subject: [PATCH 2/5] Don't use on disk snapshots for snapshot export in logical decoding We store historical snapshots on disk to enable continuation of logical decoding after restart. These snapshots were also used bu slot initialiation code for initial snapshot that the slot exports to aid synchronization of data copy and the stream consumption. However these snapshots are only useful for catalogs and not for normal user tables. So when we exported such snapshots for user to read data from tables that is consistent with a specific LSN of slot creation, user would instead read wrong data. This patch changes the code so that stored snapshots are not used when slot creation needs full snapshot. --- src/backend/replication/logical/logical.c | 10 +++++++--- src/backend/replication/logical/snapbuild.c | 19 +++++++++++++------ src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 7 ++++++- src/include/replication/logical.h | 1 + src/include/replication/snapbuild.h | 3 ++- 6 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 58e1c80..79c1dd7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -114,6 +114,7 @@ static LogicalDecodingContext * StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write) @@ -171,7 +172,8 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = - AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn); + AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, + need_full_snapshot); ctx->reorder->private_data = ctx; @@ -210,6 +212,7 @@ StartupDecodingContext(List *output_plugin_options, LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write) @@ -294,7 +297,8 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - read_page, prepare_write, do_write); + need_full_snapshot, read_page, prepare_write, + do_write); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -383,7 +387,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, } ctx = StartupDecodingContext(output_plugin_options, - start_lsn, InvalidTransactionId, + start_lsn, InvalidTransactionId, false, read_page, prepare_write, do_write); /* call output plugin initialization callback */ diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 2279604..ada618d 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -165,6 +165,9 @@ struct SnapBuild */ TransactionId initial_xmin_horizon; + /* Indicates if we are building full snapshot or just catalog one .*/ + bool building_full_snapshot; + /* * Snapshot that's valid to see the catalog state seen at this moment. */ @@ -281,7 +284,8 @@ static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, - XLogRecPtr start_lsn) + XLogRecPtr start_lsn, + bool need_full_snapshot) { MemoryContext context; MemoryContext oldcontext; @@ -308,6 +312,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; + builder->building_full_snapshot = need_full_snapshot; MemoryContextSwitchTo(oldcontext); @@ -1233,7 +1238,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * * a) There were no running transactions when the xl_running_xacts record * was inserted, jump to CONSISTENT immediately. We might find such a - * state we were waiting for b) and c). + * state we were waiting for b) or c). * * b) Wait for all toplevel transactions that were running to end. We * simply track the number of in-progress toplevel transactions and @@ -1248,7 +1253,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * at all. * * c) This (in a previous run) or another decoding slot serialized a - * snapshot to disk that we can use. + * snapshot to disk that we can use. We can't use this method for the + * initial snapshot when slot is being created and needs full snapshot + * for export or direct use. * --- */ @@ -1303,13 +1310,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* c) valid on disk state */ - else if (SnapBuildRestore(builder, lsn)) + /* c) valid on disk state and not full snapshot */ + else if (!builder->building_full_snapshot && + SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ return false; } - /* * b) first encounter of a useable xl_running_xacts record. If we had * found one earlier we would either track running transactions (i.e. diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 7104c94..9775735 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -132,7 +132,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) * Create logical decoding context, to build the initial snapshot. */ ctx = CreateInitDecodingContext( - NameStr(*plugin), NIL, + NameStr(*plugin), NIL, false, logical_read_local_xlog_page, NULL, NULL); /* build initial snapshot, might take a while */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index dbb10c7..2784d67 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -873,6 +873,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (cmd->kind == REPLICATION_KIND_LOGICAL) { LogicalDecodingContext *ctx; + bool need_full_snapshot = false; /* * Do options check early so that we can bail before calling the @@ -884,6 +885,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ereport(ERROR, (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT " "must not be called inside a transaction"))); + + need_full_snapshot = true; } else if (snapshot_action == CRS_USE_SNAPSHOT) { @@ -906,9 +909,11 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ereport(ERROR, (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT " "must not be called in a subtransaction"))); + + need_full_snapshot = true; } - ctx = CreateInitDecodingContext(cmd->plugin, NIL, + ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 7d6c88e..80f04c3 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -82,6 +82,7 @@ extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write); diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index a8ae631..494751d 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -54,7 +54,8 @@ struct xl_running_xacts; extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, - TransactionId xmin_horizon, XLogRecPtr start_lsn); + TransactionId xmin_horizon, XLogRecPtr start_lsn, + bool need_full_snapshot); extern void FreeSnapshotBuilder(SnapBuild *cache); extern void SnapBuildSnapDecRefcount(Snapshot snap); -- 2.7.4