From 5307ba8e8def7417822acf0aeaf76d0926b68462 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Fri, 13 Jan 2023 20:48:33 +0100
Subject: [PATCH 1/6] Logical decoding of sequences

This extends the logical decoding to also decode sequence increments.
We differentiate between sequences created in the current (in-progress)
transaction, and sequences created earlier. This mixed behavior is
necessary because while sequences are not transactional (increments are
not subject to ROLLBACK), relfilenode changes are. So we do this:

* Changes for sequences created in the same top-level transaction are
  treated as transactional, i.e. just like any other change from that
  transaction, and discarded in case of a rollback.

* Changes for sequences created earlier are applied immediately, as if
  performed outside any transaction. This applies also after ALTER
  SEQUENCE, which may create a new relfilenode.

Moreover, if we ever get support for DDL replication, the sequence
won't exist until the transaction gets applied.

Sequences created in the current transaction are tracked in a simple
hash table, identified by a relfilenode. That means a sequence may
already exist, but if a transaction does ALTER SEQUENCE then the
increments for the new relfilenode will be treated as transactional.

For each relfilenode we track the XID of (sub)transaction that created
it, which is needed for cleanup at transaction end. We don't need to
check the XID to decide if an increment is transactional - if we find a
match in the hash table, it has to be the same transaction.

This requires two minor changes to WAL-logging. Firstly, we need to
ensure the sequence record has a valid XID - until now the increment
might have XID 0 if it was the first change in a subxact. But the
sequence might have been created in the same top-level transaction. So
we ensure the XID is assigned when WAL-logging increments.

The other change is addition of "created" flag, marking increments for
newly created relfilenodes. This makes it easier to maintain the hash
table of sequences that need transactional handling.
Note: This is needed because of subxacts. A XID 0 might still have the
sequence created in a different subxact of the same top-level xact.

This does not include any changes to test_decoding and/or the built-in
replication - those will be committed in separate patches.

A patch adding decoding of sequences was originally submitted by Cary
Huang. This commit reworks various important aspects (e.g. the WAL
logging and transactional/non-transactional handling). However, the
original patch and reviews were very useful.

Author: Tomas Vondra, Cary Huang

Reviewed-by: Ashutosh Bapat, Amit Kapila, Peter Eisentraut, Hannu
Krosing, Andres Freund
Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com
Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
Discussion: https://postgr.es/m/76e5fcd8-8072-8ea2-d361-2e811941000c@enterprisedb.com
---
 doc/src/sgml/logicaldecoding.sgml             |  59 ++-
 src/backend/commands/sequence.c               |  73 +++
 src/backend/replication/logical/decode.c      | 193 ++++++++
 src/backend/replication/logical/logical.c     | 100 +++-
 .../replication/logical/reorderbuffer.c       | 433 ++++++++++++++++++
 src/include/access/rmgrlist.h                 |   4 +-
 src/include/replication/decode.h              |   2 +
 src/include/replication/logical.h             |   5 +
 src/include/replication/output_plugin.h       |  23 +
 src/include/replication/reorderbuffer.h       |  43 +-
 10 files changed, 924 insertions(+), 11 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index cbd3aa804f..b58f53a600 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -487,6 +487,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeTruncateCB truncate_cb;
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
+    LogicalDecodeSequenceCB sequence_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
     LogicalDecodeFilterPrepareCB filter_prepare_cb;
@@ -501,6 +502,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeStreamCommitCB stream_commit_cb;
     LogicalDecodeStreamChangeCB stream_change_cb;
     LogicalDecodeStreamMessageCB stream_message_cb;
+    LogicalDecodeStreamSequenceCB stream_sequence_cb;
     LogicalDecodeStreamTruncateCB stream_truncate_cb;
 } OutputPluginCallbacks;
 
@@ -509,10 +511,13 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      The <function>begin_cb</function>, <function>change_cb</function>
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>, <function>truncate_cb</function>,
-     <function>message_cb</function>, <function>filter_by_origin_cb</function>,
+     <function>message_cb</function>, <function>sequence_cb</function>,
+     <function>filter_by_origin_cb</function>,
      and <function>shutdown_cb</function> are optional.
      If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
+     Similarly, if <function>sequence_cb</function> is not set and a sequence
+     change is to be decoded, the action will be ignored.
     </para>
 
     <para>
@@ -520,7 +525,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      in-progress transactions. The <function>stream_start_cb</function>,
      <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
      <function>stream_commit_cb</function>, and <function>stream_change_cb</function>
-     are required, while <function>stream_message_cb</function> and
+     are required, while <function>stream_message_cb</function>,
+     <function>stream_sequence_cb</function> and
      <function>stream_truncate_cb</function> are optional. The
      <function>stream_prepare_cb</function> is also required if the output
      plugin also support two-phase commits.
@@ -839,6 +845,32 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-sequence">
+     <title>Sequence Callback</title>
+
+     <para>
+      The optional <function>sequence_cb</function> callback is called for
+      actions that update a sequence value.
+<programlisting>
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+                                         ReorderBufferTXN *txn,
+                                         XLogRecPtr sequence_lsn,
+                                         Relation rel,
+                                         bool transactional,
+                                         int64 value);
+</programlisting>
+      The <parameter>txn</parameter> parameter contains meta information about
+      the transaction the sequence change is part of. Note however that for
+      non-transactional increments, the transaction may be either NULL or not
+      NULL, depending on if the transaction already has an XID assigned.
+      The <parameter>sequence_lsn</parameter> has the WAL location of the
+      sequence update. <parameter>transactional</parameter> says if the
+      sequence has to be replayed as part of the transaction or directly.
+
+      The <parameter>value</parameter> parameter describes the sequence change.
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-filter-prepare">
      <title>Prepare Filter Callback</title>
 
@@ -1050,6 +1082,24 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-stream-sequence">
+     <title>Stream Sequence Callback</title>
+     <para>
+      The optional <function>stream_sequence_cb</function> callback is called
+      for actions that change a sequence in a block of streamed changes
+      (demarcated by <function>stream_start_cb</function> and
+      <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx,
+                                               ReorderBufferTXN *txn,
+                                               XLogRecPtr sequence_lsn,
+                                               Relation rel,
+                                               bool transactional,
+                                               int64 value);
+</programlisting>
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-stream-truncate">
      <title>Stream Truncate Callback</title>
      <para>
@@ -1230,8 +1280,9 @@ OutputPluginWrite(ctx, true);
     in-progress transactions. There are multiple required streaming callbacks
     (<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
     <function>stream_abort_cb</function>, <function>stream_commit_cb</function>
-    and <function>stream_change_cb</function>) and two optional callbacks
-    (<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
+    and <function>stream_change_cb</function>) and multiple optional callbacks
+    (<function>stream_message_cb</function>, <function>stream_sequence_cb</function>,
+    and <function>stream_truncate_cb</function>).
     Also, if streaming of two-phase commands is to be supported, then additional
     callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
     for details).
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index fc4f77e787..418cdfdec1 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -217,6 +217,10 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
 	stmt->tablespacename = NULL;
 	stmt->if_not_exists = seq->if_not_exists;
 
+	/* make sure the relfilenode creation is associated with the XID */
+	if (XLogLogicalInfoActive())
+		GetCurrentTransactionId();
+
 	address = DefineRelation(stmt, RELKIND_SEQUENCE, seq->ownerId, NULL, NULL);
 	seqoid = address.objectId;
 	Assert(seqoid != InvalidOid);
@@ -315,6 +319,10 @@ ResetSequence(Oid seq_relid)
 	seq->is_called = false;
 	seq->log_cnt = 0;
 
+	/* make sure the relfilenode creation is associated with the XID */
+	if (XLogLogicalInfoActive())
+		GetCurrentTransactionId();
+
 	/*
 	 * Create a new storage file for the sequence.
 	 */
@@ -405,8 +413,19 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(rel))
+	{
 		GetTopTransactionId();
 
+		/*
+		 * Make sure the subtransaction has a XID assigned, so that the sequence
+		 * increment WAL record is properly associated with it. This matters for
+		 * increments of sequences created/altered in the transaction, which are
+		 * handled as transactional.
+		 */
+		if (XLogLogicalInfoActive())
+			GetCurrentTransactionId();
+	}
+
 	START_CRIT_SECTION();
 
 	MarkBufferDirty(buf);
@@ -430,6 +449,9 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) tuple->t_data, tuple->t_len);
 
+		/* allow filtering by origin on a sequence update */
+		XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
 
 		PageSetLSN(page, recptr);
@@ -512,6 +534,10 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
 		if (RelationNeedsWAL(seqrel))
 			GetTopTransactionId();
 
+		/* make sure the relfilenode creation is associated with the XID */
+		if (XLogLogicalInfoActive())
+			GetCurrentTransactionId();
+
 		/*
 		 * Create a new storage file for the sequence, making the state
 		 * changes transactional.
@@ -561,8 +587,14 @@ SequenceChangePersistence(Oid relid, char newrelpersistence)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
 
+		/* make sure the relfilenode creation is associated with the XID */
+		if (XLogLogicalInfoActive())
+			GetCurrentTransactionId();
+	}
+
 	(void) read_seq_tuple(seqrel, &buf, &seqdatatuple);
 	RelationSetNewRelfilenumber(seqrel, newrelpersistence);
 	fill_seq_with_data(seqrel, &seqdatatuple);
@@ -805,10 +837,34 @@ nextval_internal(Oid relid, bool check_permissions)
 	 * It's sufficient to ensure the toplevel transaction has an xid, no need
 	 * to assign xids subxacts, that'll already trigger an appropriate wait.
 	 * (Have to do that here, so we're outside the critical section)
+	 *
+	 * We have to ensure we have a proper XID, which will be included in
+	 * the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
+	 * in a subxact (without any preceding changes) would get XID 0, and it
+	 * would then be impossible to decide which top xact it belongs to.
+	 * It'd also trigger assert in DecodeSequence. We only do that with
+	 * wal_level=logical, though.
+	 *
+	 * XXX This might seem unnecessary, because if there's no XID the xact
+	 * couldn't have done anything important yet, e.g. it could not have
+	 * created a sequence. But that's incorrect, because of subxacts. The
+	 * current subtransaction might not have done anything yet (thus no XID),
+	 * but an earlier one might have created the sequence.
 	 */
 	if (logit && RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
 
+		/*
+		 * Make sure the subtransaction has a XID assigned, so that the sequence
+		 * increment WAL record is properly associated with it. This matters for
+		 * increments of sequences created/altered in the transaction, which are
+		 * handled as transactional.
+		 */
+		if (XLogLogicalInfoActive())
+			GetCurrentTransactionId();
+	}
+
 	/* ready to change the on-disk (or really, in-buffer) tuple */
 	START_CRIT_SECTION();
 
@@ -848,6 +904,9 @@ nextval_internal(Oid relid, bool check_permissions)
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
 
+		/* allow filtering by origin on a sequence update */
+		XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
 
 		PageSetLSN(page, recptr);
@@ -1009,8 +1068,19 @@ do_setval(Oid relid, int64 next, bool iscalled)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
 
+		/*
+		 * Make sure the subtransaction has a XID assigned, so that the sequence
+		 * increment WAL record is properly associated with it. This matters for
+		 * increments of sequences created/altered in the transaction, which are
+		 * handled as transactional.
+		 */
+		if (XLogLogicalInfoActive())
+			GetCurrentTransactionId();
+	}
+
 	/* ready to change the on-disk (or really, in-buffer) tuple */
 	START_CRIT_SECTION();
 
@@ -1034,6 +1104,9 @@ do_setval(Oid relid, int64 next, bool iscalled)
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
 
+		/* allow filtering by origin on a sequence update */
+		XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
 
 		PageSetLSN(page, recptr);
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7039d425e2..f2fb4a2f24 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,6 +35,7 @@
 #include "access/xlogrecord.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
+#include "catalog/storage_xlog.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/message.h"
@@ -42,6 +43,7 @@
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
+#include "commands/sequence.h"
 
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -63,6 +65,7 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
+static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
 
 /* helper functions for decoding transactions */
 static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -1289,3 +1292,193 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
 			ctx->fast_forward || FilterByOrigin(ctx, origin_id));
 }
+
+/*
+ * DecodeSeqTuple
+ *		decode tuple describing the sequence increment
+ *
+ * Sequences are represented as a table with a single row, which gets updated
+ * by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we
+ * simply copy it into the tuplebuf (similar to seq_redo).
+ */
+static void
+DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+	int			datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
+
+	Assert(datalen >= 0);
+
+	tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
+
+	ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+	tuple->tuple.t_tableOid = InvalidOid;
+
+	memcpy(((char *) tuple->tuple.t_data),
+		   data + sizeof(xl_seq_rec),
+		   SizeofHeapTupleHeader);
+
+	memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+		   data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
+		   datalen);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc.
+ */
+void
+sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	ReorderBufferTupleBuf *tuplebuf;
+	RelFileLocator target_locator;
+	XLogReaderState *r = buf->record;
+	char	   *tupledata = NULL;
+	Size		tuplelen;
+	Size		datalen = 0;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+	Snapshot	snapshot = NULL;
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	bool		transactional;
+
+	/* ignore sequences when the plugin does not have the callbacks */
+	if (!ctx->sequences)
+		return;
+
+	/* only decode changes flagged with XLOG_SEQ_LOG */
+	if (info != XLOG_SEQ_LOG)
+		elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
+
+	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	if (target_locator.dbOid != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	tupledata = XLogRecGetData(r);
+	datalen = XLogRecGetDataLen(r);
+	tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
+
+	/* the sequence should not have changed without data */
+	if(!datalen || !tupledata)
+		elog(ERROR, "sequence decode missing tuple data");
+
+	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+	DecodeSeqTuple(tupledata, datalen, tuplebuf);
+
+	/*
+	 * Should we handle the sequence increment as transactional or not?
+	 *
+	 * If the relfilenode was created in the current transaction, treat
+	 * it as transactional and queue the increments. Otherwise it needs
+	 * to be treated as non-transactional, in which case we send it to
+	 * the plugin right away.
+	 */
+	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
+														 target_locator);
+
+	/* Skip the change if already processed (per the snapshot). */
+	if (transactional &&
+		!SnapBuildProcessChange(builder, xid, buf->origptr))
+		return;
+	else if (!transactional &&
+			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
+		return;
+
+	/* Queue the increment (or send immediately if not transactional). */
+	if (!transactional)
+		snapshot = SnapBuildGetOrBuildSnapshot(builder);
+
+	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
+							   origin_id, target_locator, transactional,
+							   tuplebuf);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding changes to sequences is a bit tricky, because while most sequence
+ * actions are non-transactional (not subject to rollback), some need to be
+ * handled as transactional.
+ *
+ * By default, a sequence change is non-transactional - we must not queue it in
+ * a transaction as other changes, because the transaction might get rolled back
+ * and we'd discard the increment. The downstream would not be notified about
+ * the increment, which is wrong.
+ *
+ * On the other hand, the relfilenode associated with the sequence may be
+ * changed in a transaction. In this case we *should* queue the change as other
+ * changes in the transaction, because we don't want to send the sequence
+ * changes, which may be rolled back, to the plugin before the transaction is
+ * committed.
+ */
+void
+smgr_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	XLogReaderState *r = buf->record;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+	xl_smgr_create *xlrec;
+
+	/*
+	 * Bail out when not decoding sequences, which is currently the only case
+	 * when we need to know about relfilenodes created in a transaction.
+	 */
+	if (!ctx->sequences)
+		return;
+
+	/* Also, we only care about XLOG_SMGR_CREATE. */
+	if (info != XLOG_SMGR_CREATE)
+		return;
+
+	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	/* only interested in our database */
+	xlrec = (xl_smgr_create *) XLogRecGetData(r);
+	if (xlrec->rlocator.dbOid != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	ReorderBufferAddRelFileLocator(ctx->reorder, xid, xlrec->rlocator);
+}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 41243d0187..aa9886a8e4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,9 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+								XLogRecPtr sequence_lsn, Relation rel,
+								bool transactional, int64 value);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -90,6 +93,9 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
 static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									  XLogRecPtr message_lsn, bool transactional,
 									  const char *prefix, Size message_size, const char *message);
+static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									   XLogRecPtr sequence_lsn, Relation rel,
+									   bool transactional, int64 value);
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
@@ -221,13 +227,14 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->sequence = sequence_cb_wrapper;
 
 	/*
 	 * To support streaming, we require start/stop/abort/commit/change
-	 * callbacks. The message and truncate callbacks are optional, similar to
-	 * regular output plugins. We however enable streaming when at least one
-	 * of the methods is enabled so that we can easily identify missing
-	 * methods.
+	 * callbacks. The message, sequence and truncate callbacks are optional,
+	 * similar to regular output plugins. We however enable streaming when at
+	 * least one of the methods is enabled so that we can easily identify
+	 * missing methods.
 	 *
 	 * We decide it here, but only check it later in the wrappers.
 	 */
@@ -237,6 +244,7 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_commit_cb != NULL) ||
 		(ctx->callbacks.stream_change_cb != NULL) ||
 		(ctx->callbacks.stream_message_cb != NULL) ||
+		(ctx->callbacks.stream_sequence_cb != NULL) ||
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
 	/*
@@ -254,6 +262,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->stream_commit = stream_commit_cb_wrapper;
 	ctx->reorder->stream_change = stream_change_cb_wrapper;
 	ctx->reorder->stream_message = stream_message_cb_wrapper;
+	ctx->reorder->stream_sequence = stream_sequence_cb_wrapper;
 	ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
 
 
@@ -287,6 +296,13 @@ StartupDecodingContext(List *output_plugin_options,
 	 */
 	ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
 
+	/*
+	 * To support logical decoding of sequences, we require the sequence
+	 * callback. We decide it here, but only check it later in the wrappers.
+	 */
+	ctx->sequences = ((ctx->callbacks.sequence_cb != NULL) ||
+					  (ctx->callbacks.stream_sequence_cb != NULL));
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -1260,6 +1276,42 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+					XLogRecPtr sequence_lsn, Relation rel, bool transactional,
+					int64 value)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	if (ctx->callbacks.sequence_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "sequence";
+	state.report_location = sequence_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = sequence_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel,
+							   transactional, value);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						XLogRecPtr first_lsn)
@@ -1575,6 +1627,46 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   XLogRecPtr sequence_lsn, Relation rel,
+						   bool transactional, int64 value)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* this callback is optional */
+	if (ctx->callbacks.stream_sequence_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_sequence";
+	state.report_location = sequence_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = sequence_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel,
+							   transactional, value);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						   int nrelations, Relation relations[],
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 26d252bd87..69d52b6f39 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -77,6 +77,39 @@
  *	  a bit more memory to the oldest subtransactions, because it's likely
  *	  they are the source for the next sequence of changes.
  *
+ *	  When decoding sequences, we differentiate between changes to sequences
+ *	  with a new relfilenode in the currently decoded transaction, and
+ *	  sequences created in other (already committed) transactions. First case
+ *	  is treated as "transactional" i.e. just like any other change from that
+ *	  transaction (and discarded in case of a rollback). Second case is treated
+ *	  as non-transactional and the changes are processed immediately, as if
+ *	  performed outside any transaction (and thus not rolled back).
+ *
+ *	  This mixed behavior is necessary - sequences are non-transactional
+ *	  (e.g. ROLLBACK does not undo the sequence increments). But for new
+ *	  sequences, we need to handle them in a transactional way, because if
+ *	  we ever get some DDL support, the sequence won't exist until the
+ *	  transaction gets applied. So we need to ensure the changes don't
+ *	  happen until the sequence gets created.
+ *
+ *	  Similarly when a new relfilenode is assigned to a sequence (because of
+ *	  some sort of DDL) the changs do not become visible till the transaction
+ *	  gets committed. If the transaction gets aborted it will never be visible.
+ *
+ *	  To decide if a sequence change is transactional, we track relfilenodes
+ *	  created in current transactions, with the XID of the (sub)transaction
+ *	  that created the relfilenode. The list of sequence relfilenodes gets
+ *	  cleaned up when a transaction completes (commit/rollback).
+ *
+ *	  We don't use the XID to check if it's the same top-level transaction.
+ *	  It's enough to know it was created in the transaction being decoded, and
+ *	  we know it must be the current one because otherwise it wouldn't see the
+ *	  sequence object.
+ *
+ *	  The XID may be valid even for non-transactional sequences - we simply
+ *	  keep the XID logged to WAL, it's up to the reorderbuffer to decide if
+ *	  the increment is transactional.
+ *
  * -------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -91,6 +124,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "commands/sequence.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -116,6 +150,13 @@ typedef struct ReorderBufferTXNByIdEnt
 	ReorderBufferTXN *txn;
 } ReorderBufferTXNByIdEnt;
 
+/* entry for hash table we use to track sequences created in running xacts */
+typedef struct ReorderBufferSequenceEnt
+{
+	RelFileLocator	rlocator;
+	TransactionId	xid;
+} ReorderBufferSequenceEnt;
+
 /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */
 typedef struct ReorderBufferTupleCidKey
 {
@@ -349,6 +390,14 @@ ReorderBufferAllocate(void)
 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+	/* hash table of sequences, mapping relfilelocator to XID of transaction */
+	hash_ctl.keysize = sizeof(RelFileLocator);
+	hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+	hash_ctl.hcxt = buffer->context;
+
+	buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
+								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
@@ -536,6 +585,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				change->data.truncate.relids = NULL;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			if (change->data.sequence.tuple)
+			{
+				ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
+				change->data.sequence.tuple = NULL;
+			}
+			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -895,6 +951,253 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * Treat the sequence change as transactional?
+ *
+ * The hash table tracks all sequences which are assigned a new relfilenode in
+ * the transaction being decoded, so we simply do a lookup (the sequence is
+ * identified by relfilende). If we find a match, the change should be handled
+ * as transactional.
+ */
+bool
+ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+									 RelFileLocator rlocator)
+{
+	bool	found = false;
+
+	hash_search(rb->sequences,
+				(void *) &rlocator,
+				HASH_FIND,
+				&found);
+
+	return found;
+}
+
+/*
+ * Cleanup sequences created in in-progress transactions.
+ *
+ * There's no way to search by XID, so we simply do a seqscan of all
+ * the entries in the hash table. Hopefully there are only a couple
+ * entries in most cases - people generally don't create many new
+ * sequences over and over.
+ */
+static void
+ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
+{
+	HASH_SEQ_STATUS scan_status;
+	ReorderBufferSequenceEnt *ent;
+
+	hash_seq_init(&scan_status, rb->sequences);
+	while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
+	{
+		/* skip sequences not from this transaction */
+		if (ent->xid != xid)
+			continue;
+
+		(void) hash_search(rb->sequences,
+					   (void *) &(ent->rlocator),
+					   HASH_REMOVE, NULL);
+	}
+}
+
+/*
+ * A transactional sequence increment is queued to be processed upon commit
+ * and a non-transactional increment gets processed immediately.
+ *
+ * A sequence update may be both transactional and non-transactional. When
+ * created in a running transaction, treat it as transactional and queue
+ * the change in it. Otherwise treat it as non-transactional, so that we
+ * don't forget the increment in case of a rollback.
+ */
+void
+ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+						   RelFileLocator rlocator, bool transactional,
+						   ReorderBufferTupleBuf *tuplebuf)
+{
+	/*
+	 * Change needs to be handled as transactional, because the sequence was
+	 * created in a transaction that is still running. In that case all the
+	 * changes need to be queued in that transaction, we must not send them
+	 * to the downstream until the transaction commits.
+	 *
+	 * There's a bit of a trouble with subtransactions - we can't queue it
+	 * into the subxact, because it might be rolled back and we'd lose the
+	 * increment. We need to queue it into the same (sub)xact that created
+	 * the sequence, which is why we track the XID in the hash table.
+	 */
+	if (transactional)
+	{
+		MemoryContext oldcontext;
+		ReorderBufferChange *change;
+		ReorderBufferSequenceEnt *ent;
+		bool	found;
+
+		/* allocate and queue the transactional sequence change */
+		oldcontext = MemoryContextSwitchTo(rb->context);
+
+		change = ReorderBufferGetChange(rb);
+
+		change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+		change->origin_id = origin_id;
+
+		memcpy(&change->data.sequence.locator, &rlocator, sizeof(RelFileLocator));
+
+		change->data.sequence.tuple = tuplebuf;
+
+		/*
+		 * Lookup XID of the transaction where to queue the change (the one
+		 * that did the ALTER SEQUENCE etc.)
+		 */
+		ent = hash_search(rb->sequences,
+						  (void *) &rlocator,
+						  HASH_FIND,
+						  &found);
+
+		Assert(found);
+
+		/* add it to the same subxact that created the sequence */
+		ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+	else
+	{
+		/*
+		 * This increment is for a sequence that was not created in any
+		 * running transaction, so we treat it as non-transactional and
+		 * just send it to the output plugin directly.
+		 */
+		ReorderBufferTXN *txn = NULL;
+		volatile Snapshot snapshot_now = snapshot;
+		bool	using_subtxn;
+
+		/* non-transactional changes require a valid snapshot */
+		Assert(snapshot_now);
+
+#ifdef USE_ASSERT_CHECKING
+		/* Make sure the sequence is not in the hash table. */
+		{
+			bool	found;
+			hash_search(rb->sequences,
+						(void *) &rlocator,
+						HASH_FIND, &found);
+			Assert(!found);
+		}
+#endif
+
+		if (xid != InvalidTransactionId)
+			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+		/* setup snapshot to allow catalog access */
+		SetupHistoricSnapshot(snapshot_now, NULL);
+
+		/*
+		 * Decoding needs access to syscaches et al., which in turn use
+		 * heavyweight locks and such. Thus we need to have enough state around to
+		 * keep track of those.  The easiest way is to simply use a transaction
+		 * internally.  That also allows us to easily enforce that nothing writes
+		 * to the database by checking for xid assignments.
+		 *
+		 * When we're called via the SQL SRF there's already a transaction
+		 * started, so start an explicit subtransaction there.
+		 */
+		using_subtxn = IsTransactionOrTransactionBlock();
+
+		PG_TRY();
+		{
+			Relation	relation;
+			HeapTuple	tuple;
+			Form_pg_sequence_data seq;
+			Oid			reloid;
+			int64		value;
+
+			if (using_subtxn)
+				BeginInternalSubTransaction("sequence");
+			else
+				StartTransactionCommand();
+
+			reloid = RelidByRelfilenumber(rlocator.spcOid, rlocator.relNumber);
+
+			if (reloid == InvalidOid)
+				elog(ERROR, "could not map filenode \"%s\" to relation OID",
+					 relpathperm(rlocator,
+								 MAIN_FORKNUM));
+
+			relation = RelationIdGetRelation(reloid);
+
+			if (!RelationIsValid(relation))
+				elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+					 reloid,
+					 relpathperm(rlocator, MAIN_FORKNUM));
+
+			tuple = &tuplebuf->tuple;
+			seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+
+			/*
+			 * Calculate the first value of the next batch (at which point we
+			 * generate and decode another WAL record.
+			 */
+			value = seq->last_value;
+			value += (seq->is_called) ? seq->log_cnt : 0;
+
+			rb->sequence(rb, txn, lsn, relation, transactional, value);
+
+			RelationClose(relation);
+
+			TeardownHistoricSnapshot(false);
+
+			AbortCurrentTransaction();
+
+			if (using_subtxn)
+				RollbackAndReleaseCurrentSubTransaction();
+		}
+		PG_CATCH();
+		{
+			TeardownHistoricSnapshot(true);
+
+			AbortCurrentTransaction();
+
+			if (using_subtxn)
+				RollbackAndReleaseCurrentSubTransaction();
+
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+	}
+}
+
+/*
+ * ReorderBufferAddRelFileLocator
+ *		Add newly created relfilenode to the global hash table.
+ */
+void
+ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+							   RelFileLocator rlocator)
+{
+	/* lookup sequence by relfilelocator */
+	ReorderBufferSequenceEnt   *ent;
+	bool						found;
+
+	/* sequence changes require a transaction */
+	if (xid == InvalidTransactionId)
+		return;
+
+	/* search the lookup table */
+	ent = hash_search(rb->sequences,
+					  (void *) &rlocator,
+					  HASH_ENTER,
+					  &found);
+
+	/*
+	 * We've just decoded creation of the relfilenode, so if we found it in
+	 * the hash table, something is wrong.
+	 */
+	Assert(!found);
+
+	ent->xid = xid;
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1585,6 +1888,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
 	Assert(found);
 
+	/* Remove sequences created in this transaction (if any). */
+	ReorderBufferSequenceCleanup(rb, txn->xid);
+
 	/* remove entries spilled to disk */
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
@@ -1997,6 +2303,35 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					change->data.msg.message);
 }
 
+/*
+ * Helper function for ReorderBufferProcessTXN for applying sequences.
+ */
+static inline void
+ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
+						   Relation relation, ReorderBufferChange *change,
+						   bool streaming)
+{
+	HeapTuple	tuple;
+	Form_pg_sequence_data seq;
+	int64		value;
+
+	tuple = &change->data.sequence.tuple->tuple;
+	seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+
+	value = seq->last_value;
+	value += (seq->is_called) ? seq->log_cnt : 0;
+
+	/*
+	 * When called from ReorderBufferApplySequence, we're applying changes
+	 * accumulated in a ReorderBufferTXN, so all those are transactional
+	 * changes of sequences.
+	 */
+	if (streaming)
+		rb->stream_sequence(rb, txn, change->lsn, relation, true, value);
+	else
+		rb->sequence(rb, txn, change->lsn, relation, true, value);
+}
+
 /*
  * Function to store the command id and snapshot at the end of the current
  * stream so that we can reuse the same while sending the next stream.
@@ -2442,6 +2777,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
+
+				case REORDER_BUFFER_CHANGE_SEQUENCE:
+					Assert(snapshot_now);
+
+					reloid = RelidByRelfilenumber(change->data.sequence.locator.spcOid,
+												  change->data.sequence.locator.relNumber);
+
+					if (reloid == InvalidOid)
+						elog(ERROR, "could not map filenode \"%s\" to relation OID",
+							 relpathperm(change->data.sequence.locator,
+										 MAIN_FORKNUM));
+
+					relation = RelationIdGetRelation(reloid);
+
+					if (!RelationIsValid(relation))
+						elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+							 reloid,
+							 relpathperm(change->data.sequence.locator,
+										 MAIN_FORKNUM));
+
+					if (RelationIsLogicallyLogged(relation))
+						ReorderBufferApplySequence(rb, txn, relation, change, streaming);
+
+					RelationClose(relation);
+					break;
 			}
 
 			/*
@@ -3909,6 +4269,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				memcpy(data, change->data.truncate.relids, size);
 				data += size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			{
+				char	   *data;
+				ReorderBufferTupleBuf *tup;
+				Size		len = 0;
+
+				tup = change->data.sequence.tuple;
+
+				if (tup)
+				{
+					sz += sizeof(HeapTupleData);
+					len = tup->tuple.t_len;
+					sz += len;
+				}
+
+				/* make sure we have enough space */
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				if (len)
+				{
+					memcpy(data, &tup->tuple, sizeof(HeapTupleData));
+					data += sizeof(HeapTupleData);
+
+					memcpy(data, tup->tuple.t_data, len);
+					data += len;
+				}
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4173,6 +4566,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 			{
 				sz += sizeof(Oid) * change->data.truncate.nrelids;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			{
+				ReorderBufferTupleBuf *tup;
+				Size		len = 0;
+
+				tup = change->data.sequence.tuple;
+
+				if (tup)
+				{
+					sz += sizeof(HeapTupleData);
+					len = tup->tuple.t_len;
+					sz += len;
+				}
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4476,6 +4885,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 				break;
 			}
+
+		case REORDER_BUFFER_CHANGE_SEQUENCE:
+			if (change->data.sequence.tuple)
+			{
+				uint32		tuplelen = ((HeapTuple) data)->t_len;
+
+				change->data.sequence.tuple =
+					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
+
+				/* restore ->tuple */
+				memcpy(&change->data.sequence.tuple->tuple, data,
+					   sizeof(HeapTupleData));
+				data += sizeof(HeapTupleData);
+
+				/* reset t_data pointer into the new tuplebuf */
+				change->data.sequence.tuple->tuple.t_data =
+					ReorderBufferTupleBufData(change->data.sequence.tuple);
+
+				/* restore tuple data itself */
+				memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
+				data += tuplelen;
+			}
+			break;
+
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 463bcb67c5..afd1d35221 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -27,7 +27,7 @@
 /* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */
 PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
 PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, smgr_decode)
 PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
@@ -40,7 +40,7 @@ PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog
 PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
 PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
 PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, sequence_decode)
 PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 14fa921ab4..e077a2b3c7 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -27,6 +27,8 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void smgr_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea0..e4699e04fc 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -90,6 +90,11 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		twophase;
 
+	/*
+	 * Does the output pluging support decoding of sequence increments?
+	 */
+	bool		sequences;
+
 	/*
 	 * Is two-phase option given by output plugin?
 	 *
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 3ac6729386..75327645d2 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -90,6 +90,16 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 										Size message_size,
 										const char *message);
 
+/*
+ * Called for the generic logical decoding sequences.
+ */
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr sequence_lsn,
+										 Relation rel,
+										 bool transactional,
+										 int64 value);
+
 /*
  * Filter changes by origin.
  */
@@ -201,6 +211,17 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
 											  Size message_size,
 											  const char *message);
 
+/*
+ * Called for the streaming generic logical decoding sequences from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr sequence_lsn,
+											   Relation rel,
+											   bool transactional,
+											   int64 value);
+
 /*
  * Callback for streaming truncates from in-progress transactions.
  */
@@ -221,6 +242,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeSequenceCB sequence_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
@@ -239,6 +261,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeStreamCommitCB stream_commit_cb;
 	LogicalDecodeStreamChangeCB stream_change_cb;
 	LogicalDecodeStreamMessageCB stream_message_cb;
+	LogicalDecodeStreamSequenceCB stream_sequence_cb;
 	LogicalDecodeStreamTruncateCB stream_truncate_cb;
 } OutputPluginCallbacks;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1b9db22acb..1f3a39c311 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -73,7 +73,8 @@ typedef enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
-	REORDER_BUFFER_CHANGE_TRUNCATE
+	REORDER_BUFFER_CHANGE_TRUNCATE,
+	REORDER_BUFFER_CHANGE_SEQUENCE
 } ReorderBufferChangeType;
 
 /* forward declaration */
@@ -167,6 +168,13 @@ typedef struct ReorderBufferChange
 			uint32		ninvalidations; /* Number of messages */
 			SharedInvalidationMessage *invalidations;	/* invalidation message */
 		}			inval;
+
+		/* Context data for Sequence changes */
+		struct
+		{
+			RelFileLocator locator;
+			ReorderBufferTupleBuf *tuple;
+		}			sequence;
 	}			data;
 
 	/*
@@ -470,6 +478,14 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* sequence callback signature */
+typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr sequence_lsn,
+										 Relation rel,
+										 bool transactional,
+										 int64 value);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -536,6 +552,14 @@ typedef void (*ReorderBufferStreamMessageCB) (
 											  const char *prefix, Size sz,
 											  const char *message);
 
+/* stream sequence callback signature */
+typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr sequence_lsn,
+											   Relation rel,
+											   bool transactional,
+											   int64 value);
+
 /* stream truncate callback signature */
 typedef void (*ReorderBufferStreamTruncateCB) (
 											   ReorderBuffer *rb,
@@ -557,6 +581,12 @@ struct ReorderBuffer
 	 */
 	HTAB	   *by_txn;
 
+	/*
+	 * relfilenode => XID lookup table for sequences created in a transaction
+	 * (also includes altered sequences, which assigns new relfilenode)
+	 */
+	HTAB	   *sequences;
+
 	/*
 	 * Transactions that could be a toplevel xact, ordered by LSN of the first
 	 * record bearing that xid.
@@ -592,6 +622,7 @@ struct ReorderBuffer
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
+	ReorderBufferSequenceCB sequence;
 
 	/*
 	 * Callbacks to be called when streaming a transaction at prepare time.
@@ -611,6 +642,7 @@ struct ReorderBuffer
 	ReorderBufferStreamCommitCB stream_commit;
 	ReorderBufferStreamChangeCB stream_change;
 	ReorderBufferStreamMessageCB stream_message;
+	ReorderBufferStreamSequenceCB stream_sequence;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
 	/*
@@ -696,6 +728,10 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 									  Snapshot snap, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+extern void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+									   RelFileLocator locator, bool transactional,
+									   ReorderBufferTupleBuf *tuplebuf);
 extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
@@ -750,4 +786,9 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 
 extern void StartupReorderBuffer(void);
 
+extern void ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+										   RelFileLocator rlocator);
+extern bool	ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+												 RelFileLocator locator);
+
 #endif
-- 
2.41.0

