diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 22b2a93..16c427e 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8110,9 +8110,10 @@ SCRAM-SHA-256$<iteration count>:&l
This catalog only contains tables and sequences known to the subscription
after running either
- CREATE SUBSCRIPTION
- or ALTER SUBSCRIPTION ... REFRESH
- PUBLICATION or
+ CREATE SUBSCRIPTION or
+
+ ALTER SUBSCRIPTION ... REFRESH PUBLICATION or
+
ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES.
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index bc6d18b..a1ee74b 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -490,7 +490,10 @@ HasSubscriptionRelations(Oid subid)
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
- /* If even a single tuple exists then the subscription has tables. */
+ /*
+ * Skip sequence tuples. If even a single table tuple
+ * exists then the subscription has tables.
+ */
if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
{
has_subrels = true;
@@ -508,17 +511,21 @@ HasSubscriptionRelations(Oid subid)
/*
* Get the relations for the subscription.
*
- * all_relations:
- * If returning sequences, if all_relations=true get all sequences,
- * otherwise only get sequences that are in 'init' state.
- * If returning tables, if all_relation=true get all tables, otherwise
+ * get_tables: get relations for tables of the subscription.
+ *
+ * get_sequences: get relations for sequences of the subscription.
+ *
+ * all_states:
+ * If getting tables, if all_states is true get all tables, otherwise
* only get tables that have not reached 'READY' state.
+ * If getting sequences, if all_states is true get all sequences,
+ * otherwise only get sequences that are in 'init' state.
*
* The returned list is palloc'ed in the current memory context.
*/
List *
GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences,
- bool all_relations)
+ bool all_states)
{
List *res = NIL;
Relation rel;
@@ -527,6 +534,9 @@ GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences,
ScanKeyData skey[2];
SysScanDesc scan;
+ /* One or both of 'get_tables' and 'get_sequences' must be true. */
+ Assert(get_tables || get_sequences);
+
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
ScanKeyInit(&skey[nkeys++],
@@ -535,13 +545,13 @@ GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences,
ObjectIdGetDatum(subid));
/* Get the relations that are not in ready state */
- if (get_tables && !all_relations)
+ if (get_tables && !all_states)
ScanKeyInit(&skey[nkeys++],
Anum_pg_subscription_rel_srsubstate,
BTEqualStrategyNumber, F_CHARNE,
CharGetDatum(SUBREL_STATE_READY));
/* Get the sequences that are in init state */
- else if (get_sequences && !all_relations)
+ else if (get_sequences && !all_states)
ScanKeyInit(&skey[nkeys++],
Anum_pg_subscription_rel_srsubstate,
BTEqualStrategyNumber, F_CHAREQ,
@@ -561,11 +571,11 @@ GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences,
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
subreltype = get_rel_relkind(subrel->srrelid);
- /* If only tables were requested, skip the sequences */
+ /* Skip sequences if they were not requested */
if (subreltype == RELKIND_SEQUENCE && !get_sequences)
continue;
- /* If only sequences were requested, skip the tables */
+ /* Skip tables if they were not requested */
if (subreltype != RELKIND_SEQUENCE && !get_tables)
continue;
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index e8bd53c..2e63925 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -942,11 +942,11 @@ lastval(PG_FUNCTION_ARGS)
* it is the only way to clear the is_called flag in an existing
* sequence.
*
- * logcnt is currently used only by sequence syncworker to set the log_cnt for
- * sequences while synchronization of sequence values from the publisher.
+ * log_cnt is currently used only by the sequence syncworker to set the
+ * log_cnt for sequences while synchronizing values from the publisher.
*/
void
-do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt)
+do_setval(Oid relid, int64 next, bool is_called, int64 log_cnt)
{
SeqTable elm;
Relation seqrel;
@@ -997,7 +997,7 @@ do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt)
(long long) minv, (long long) maxv)));
/* Set the currval() state only if iscalled = true */
- if (iscalled)
+ if (is_called)
{
elm->last = next; /* last returned number */
elm->last_valid = true;
@@ -1014,8 +1014,8 @@ do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt)
START_CRIT_SECTION();
seq->last_value = next; /* last fetched number */
- seq->is_called = iscalled;
- seq->log_cnt = (logcnt == SEQ_LOG_CNT_INVALID) ? 0: logcnt;
+ seq->is_called = is_called;
+ seq->log_cnt = (log_cnt == SEQ_LOG_CNT_INVALID) ? 0: log_cnt;
MarkBufferDirty(buf);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 984f72d..1c01a2b 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -735,9 +735,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
{
char *err;
WalReceiverConn *wrconn;
- List *tables;
- ListCell *lc;
- char table_state;
bool must_use_password;
/* Try to connect to the publisher. */
@@ -752,7 +749,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PG_TRY();
{
+ List *tables;
List *sequences;
+ char table_state;
check_publications(wrconn, publications);
check_publications_origin(wrconn, publications, opts.copy_data,
@@ -769,9 +768,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* info.
*/
tables = fetch_table_list(wrconn, publications);
- foreach(lc, tables)
+ foreach_ptr(RangeVar, rv, tables)
{
- RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
@@ -884,9 +882,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* sequences that have been added or removed since the last subscription
* creation or publication refresh.
*
- * If 'all_relations' is true, it will mark all objects with "init" state
- * for re-synchronization; otherwise, only the newly added tables and
- * sequences will be updated based on the copy_data parameter.
+ * If 'all_relations' is true, mark all objects with "init" state
+ * for re-synchronization; otherwise, only update the newly added tables and
+ * sequences based on the copy_data parameter.
*/
static void
AlterSubscription_refresh(Subscription *sub, bool copy_data,
@@ -984,12 +982,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
{
RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
+ char relkind;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
/* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
- rv->schemaname, rv->relname);
+ relkind = get_rel_relkind(relid);
+ CheckSubscriptionRelkind(relkind, rv->schemaname, rv->relname);
pubrel_local_oids[off++] = relid;
@@ -1001,7 +1000,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
InvalidXLogRecPtr, true);
ereport(DEBUG1,
(errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
- get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table",
+ relkind == RELKIND_SEQUENCE ? "sequence" : "table",
rv->schemaname, rv->relname, sub->name)));
}
}
@@ -1086,7 +1085,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
ereport(DEBUG1,
(errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"",
- get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table",
+ relkind == RELKIND_SEQUENCE ? "sequence" : "table",
get_namespace_name(get_rel_namespace(relid)),
get_rel_name(relid),
sub->name)));
@@ -1116,6 +1115,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
*/
for (off = 0; off < remove_rel_len; off++)
{
+ /* Skip relations belonging to sequences. */
if (get_rel_relkind(sub_remove_rels[off].relid) == RELKIND_SEQUENCE)
continue;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 04d76e7..5da5529 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -236,30 +236,27 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/*
* Walks the workers array and searches for one that matches given
* subscription id, relid and type.
- *
- * We are only interested in the leader apply worker, table sync worker, or
- * sequence sync worker.
*/
LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType type,
+logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype,
bool only_running)
{
int i;
LogicalRepWorker *res = NULL;
+ Assert(wtype == WORKERTYPE_TABLESYNC ||
+ wtype == WORKERTYPE_SEQUENCESYNC ||
+ wtype == WORKERTYPE_APPLY);
+
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
- /* Search for attached worker for a given subscription id. */
+ /* Search for the attached worker matching the specified criteria. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- /* Skip parallel apply workers. */
- if (isParallelApplyWorker(w))
- continue;
-
if (w->in_use && w->subid == subid && w->relid == relid &&
- w->type == type && (!only_running || w->proc))
+ w->type == wtype && (!only_running || w->proc))
{
res = w;
break;
@@ -626,13 +623,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
* Stop the logical replication worker for subid/relid, if any.
*/
void
-logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType type)
+logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype)
{
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(subid, relid, type, false);
+ worker = logicalrep_worker_find(subid, relid, wtype, false);
if (worker)
{
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index fc36bf9..9aef45a 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -32,9 +32,11 @@
/*
* fetch_remote_sequence_data
*
- * Retrieve the last_value, log_cnt, page_lsn and is_called of the sequence
- * from the remote node. The last_value will be returned directly, while
- * log_cnt, is_called and page_lsn will be provided through the output
+ * Retrieve sequence data (last_value, log_cnt, page_lsn and is_called)
+ * from the remote node.
+ *
+ * The sequence last_value will be returned directly, while
+ * log_cnt, is_called and page_lsn will be returned via the output
* parameters log_cnt, is_called and lsn, respectively.
*/
static int64
@@ -46,7 +48,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, char *nspname,
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[4] = {INT8OID, INT8OID, BOOLOID, LSNOID};
- int64 value = (Datum) 0;
+ int64 last_value = (Datum) 0;
bool isnull;
initStringInfo(&cmd);
@@ -70,7 +72,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, char *nspname,
errmsg("sequence \"%s.%s\" not found on publisher",
nspname, relname)));
- value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+ last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
*log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
@@ -86,23 +88,24 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, char *nspname,
walrcv_clear_result(res);
- return value;
+ return last_value;
}
/*
* Copy existing data of a sequence from publisher.
*
* Fetch the sequence value from the publisher and set the subscriber sequence
- * with the retrieved value. Caller is responsible for locking the local
+ * with the same value. Caller is responsible for locking the local
* relation.
*/
static XLogRecPtr
copy_sequence(WalReceiverConn *conn, Relation rel)
{
StringInfoData cmd;
- int64 sequence_value;
- int64 log_cnt;
- XLogRecPtr lsn = InvalidXLogRecPtr;
+ int64 seq_last_value;
+ int64 seq_log_cnt;
+ bool seq_is_called;
+ XLogRecPtr seq_lsn = InvalidXLogRecPtr;
WalRcvExecResult *res;
Oid tableRow[] = {OIDOID, CHAROID};
TupleTableSlot *slot;
@@ -111,7 +114,6 @@ copy_sequence(WalReceiverConn *conn, Relation rel)
bool isnull;
char *nspname = get_namespace_name(RelationGetNamespace(rel));
char *relname = RelationGetRelationName(rel);
- bool is_called;
/* Fetch Oid. */
initStringInfo(&cmd);
@@ -148,14 +150,14 @@ copy_sequence(WalReceiverConn *conn, Relation rel)
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
- sequence_value = fetch_remote_sequence_data(conn, remoteid, nspname,
- relname, &log_cnt, &is_called,
- &lsn);
+ seq_last_value = fetch_remote_sequence_data(conn, remoteid, nspname,
+ relname, &seq_log_cnt, &seq_is_called,
+ &seq_lsn);
- do_setval(RelationGetRelid(rel), sequence_value, is_called, log_cnt);
+ do_setval(RelationGetRelid(rel), seq_last_value, seq_is_called, seq_log_cnt);
/* return the LSN when the sequence state was set */
- return lsn;
+ return seq_lsn;
}
/*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3e162b9..6e7ed8e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -680,7 +680,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* apply process (currently, all that have state SUBREL_STATE_INIT) and manage
* synchronization for them.
*
- * If there is a sequence synchronization worker running already, no need to
+ * If a sequence synchronization worker is running already, there is no need to
* start a new one; the existing sequence sync worker will synchronize all the
* sequences. If there are still any sequences to be synced after the sequence
* sync worker exited, then a new sequence sync worker can be started in the
@@ -697,7 +697,7 @@ process_syncing_sequences_for_apply()
Assert(!IsTransactionState());
/*
- * Start sequence sync worker if there is not one already.
+ * Start the sequence sync worker if needed, and there is not one already.
*/
foreach_ptr(SubscriptionRelState, rstate, sequence_states_not_ready)
{
@@ -753,6 +753,7 @@ process_syncing_sequences_for_apply()
now, wal_retrieve_retry_interval))
{
MyLogicalRepWorker->sequencesync_failure_time = 0;
+
logicalrep_worker_launch(WORKERTYPE_SEQUENCESYNC,
MyLogicalRepWorker->dbid,
MySubscription->oid,
@@ -1689,16 +1690,14 @@ copy_table_done:
/*
* Common code to fetch the up-to-date sync state info into the static lists.
*
- * Copy tables that are not ready into table_states_not_ready and sequences
- * that are not ready into sequence_states_not_ready. The pg_subscription_rel
- * table is shared between sequences and tables. Because changes to either
- * sequences or relations can affect the validity of relation states, we update
- * both table_states_not_ready and sequence_states_not_ready simultaneously
- * to ensure consistency, rather than updating them separately. Returns true if
- * subscription has 1 or more tables, else false.
+ * Copy tables that are not READY state into table_states_not_ready, and sequences
+ * that have INIT state into sequence_states_not_ready. The pg_subscription_rel
+ * catalog is shared by tables and sequences. Changes to either sequences or
+ * tables can affect the validity of relation states, so we update both
+ * table_states_not_ready and sequence_states_not_ready simultaneously
+ * to ensure consistency.
*
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
+ * Returns true if subscription has 1 or more tables, else false.
*/
static bool
FetchTableStates(void)
@@ -1728,7 +1727,7 @@ FetchTableStates(void)
}
/*
- * Fetch the tables that are in non-ready state and the sequences that
+ * Fetch tables that are in non-ready state, and sequences that
* are in init state.
*/
rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 71d8c76..b81f496 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -62,7 +62,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void SequenceChangePersistence(Oid relid, char newrelpersistence);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
-extern void do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt);
+extern void do_setval(Oid relid, int64 next, bool is_called, int64 log_cnt);
extern void ResetSequenceCaches(void);
extern void seq_redo(XLogReaderState *record);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 8a12ecb..6b201d6 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -242,7 +242,7 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
- LogicalRepWorkerType type,
+ LogicalRepWorkerType wtype,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running,
bool acquire_lock);