diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 9162286c98..d7a04539d6 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -82,6 +82,8 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; +bool logical_replication = true; + /* * When running as a parallel worker, we place only a single * TransactionStateData on the parallel worker's state stack, and the XID @@ -5535,6 +5537,9 @@ XactLogCommitRecord(TimestampTz commit_time, xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } + if (!logical_replication) + xl_xinfo.xinfo |= XACT_XINFO_LOGIREPL_OFF; + if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c53e7e2279..b11c89ff74 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -618,6 +618,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + (parsed->xinfo & XACT_XINFO_LOGIREPL_OFF) || ctx->fast_forward || FilterByOrigin(ctx, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 90ffd89339..0880a2765f 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1952,6 +1952,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"logical_replication", PGC_USERSET, REPLICATION_MASTER, + gettext_noop("Close logical replication of transactions in the session."), + NULL + }, + &logical_replication, + true, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/include/access/xact.h b/src/include/access/xact.h index d714551704..1a7f52ac50 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -81,6 +81,9 @@ typedef enum /* Synchronous commit level */ extern int synchronous_commit; +/* turn on/off logical replication */ +extern bool logical_replication; + /* * Miscellaneous flag bits to record events which occur on the top level * transaction. These flags are only persisted in MyXactFlags and are intended @@ -168,6 +171,12 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_AE_LOCKS (1U << 6) #define XACT_XINFO_HAS_GID (1U << 7) +/* + * It indicates that the data affected by this transaction does not need + * to be included in the logical replication. + */ +#define XACT_XINFO_LOGIREPL_OFF (1U << 28) + /* * Also stored in xinfo, these indicating a variety of additional actions that * need to occur when emulating transaction effects during recovery.