diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index f0819d15ab7..07d03498e17 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -44,6 +44,7 @@ #include "catalog/pg_tablespace.h" #include "commands/comment.h" #include "commands/dbcommands.h" +#include "commands/sequence.h" #include "commands/dbcommands_xlog.h" #include "commands/defrem.h" #include "commands/seclabel.h" @@ -108,6 +109,7 @@ typedef struct CreateDBRelInfo { RelFileLocator rlocator; /* physical relation identifier */ Oid reloid; /* relation oid */ + char relkind; /* relation kind */ bool permanent; /* relation is permanent or unlogged */ } CreateDBRelInfo; @@ -218,6 +220,10 @@ CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, /* Copy relation storage from source to the destination. */ CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent); + /* Reset log_cnt for sequences to ensure WAL on first nextval() */ + if (relinfo->relkind == RELKIND_SEQUENCE) + ResetSequenceLogCnt(dstrlocator, relinfo->permanent); + /* Release the relation locks. */ UnlockRelationId(&srcrelid, AccessShareLock); UnlockRelationId(&dstrelid, AccessShareLock); @@ -442,6 +448,7 @@ ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid, relinfo->rlocator.dbOid = dbid; relinfo->rlocator.relNumber = relfilenumber; relinfo->reloid = classForm->oid; + relinfo->relkind = classForm->relkind; /* Temporary relations were rejected above. */ Assert(classForm->relpersistence != RELPERSISTENCE_TEMP); diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 551667650ba..3f47826680d 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -1042,6 +1042,60 @@ SetSequence(Oid relid, int64 next, bool iscalled) sequence_close(seqrel, NoLock); } +/* + * Reset log_cnt of a sequence that has been copied by CREATE DATABASE + * with the WAL_LOG strategy to zero. This forces the next nextval() call + * to emit a WAL record, ensuring that the standby's last_value is + * at least as large as the primary's value in streaming replication. + */ +void +ResetSequenceLogCnt(RelFileLocator rlocator, bool permanent) +{ + Buffer buf; + Page page; + ItemId lp; + HeapTupleData seqtuple; + Form_pg_sequence_data seq; + + buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, 0, + RBM_NORMAL, NULL, permanent); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + page = BufferGetPage(buf); + lp = PageGetItemId(page, FirstOffsetNumber); + Assert(ItemIdIsNormal(lp)); + + seqtuple.t_data = (HeapTupleHeader) PageGetItem(page, lp); + seqtuple.t_len = ItemIdGetLength(lp); + seq = (Form_pg_sequence_data) GETSTRUCT(&seqtuple); + + if (seq->log_cnt != 0) + { + START_CRIT_SECTION(); + seq->log_cnt = 0; + MarkBufferDirty(buf); + + if (permanent) + { + xl_seq_rec xlrec; + XLogRecPtr recptr; + + XLogBeginInsert(); + XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); + + xlrec.locator = rlocator; + XLogRegisterData(&xlrec, sizeof(xl_seq_rec)); + XLogRegisterData((char *) seqtuple.t_data, seqtuple.t_len); + + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); + + PageSetLSN(page, recptr); + } + END_CRIT_SECTION(); + } + + UnlockReleaseBuffer(buf); +} + /* * Implement the 2 arg setval procedure. * See SetSequence for discussion. diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 2c3c4a3f074..2b9289f1ff2 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -13,6 +13,7 @@ #ifndef SEQUENCE_H #define SEQUENCE_H +#include "storage/relfilelocator.h" #include "catalog/objectaddress.h" #include "fmgr.h" #include "nodes/parsenodes.h" @@ -49,5 +50,6 @@ extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); extern void SetSequence(Oid relid, int64 next, bool iscalled); extern void ResetSequenceCaches(void); +extern void ResetSequenceLogCnt(RelFileLocator rlocator, bool permanent); #endif /* SEQUENCE_H */