Re: Synchronizing slots from primary to standby

From: shveta malik <shveta(dot)malik(at)gmail(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, shveta malik <shveta(dot)malik(at)gmail(dot)com>
Subject: Re: Synchronizing slots from primary to standby
Date: 2023-10-09 10:34:44
Message-ID: CAJpy0uDTJFWFzz3=Qo75AG2SugjQ3dvVkOJChii=1vfHfnjWuQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Oct 4, 2023 at 8:53 AM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> Here are some review comments for v20-0002.
>

Thanks Peter for the feedback. Comments from 31 till end are addressed
in v22. First 30 comments will be addressed in the next version.

> ======
> 1. GENERAL - errmsg/elog messages
>
> There are a a lot of minor problems and/or quirks across all the
> message texts. Here is a summary of some I found:
>
> ERROR
> errmsg("could not receive list of slots from the primary server: %s",
> errmsg("invalid response from primary server"),
> errmsg("invalid connection string syntax: %s",
> errmsg("replication slot-sync worker slot %d is empty, cannot attach",
> errmsg("replication slot-sync worker slot %d is already used by
> another worker, cannot attach",
> errmsg("replication slot-sync worker slot %d is already used by
> another worker, cannot attach",
> errmsg("could not connect to the primary server: %s",
>
> errmsg("operation not permitted on replication slots on standby which
> are synchronized from primary")));
> /primary/the primary/
>
> errmsg("could not fetch invalidation cuase for slot \"%s\" from primary: %s",
> /cuase/cause/
> /primary/the primary/
>
> errmsg("slot \"%s\" disapeared from the primary",
> /disapeared/disappeared/
>
> errmsg("could not fetch slot info from the primary: %s",
> errmsg("could not connect to the primary server: %s", err)));
> errmsg("could not map dynamic shared memory segment for slot-sync worker")));
>
> errmsg("physical replication slot %s found in synchronize_slot_names",
> slot name not quoted?
> ---
>
> WARNING
> errmsg("out of background worker slots"),
>
> errmsg("Replication slot-sync worker failed to attach to worker-pool slot %d",
> case?
>
> errmsg("Removed database %d from replication slot-sync worker %d;
> dbcount now: %d",
> case?
>
> errmsg("Skipping slots synchronization as primary_slot_name is not set."));
> case?
>
> errmsg("Skipping slots synchronization as hot_standby_feedback is off."));
> case?
>
> errmsg("Skipping slots synchronization as dbname is not specified in
> primary_conninfo."));
> case?
>
> errmsg("slot-sync wait for slot %s interrupted by promotion, slot
> creation aborted",
>
> errmsg("could not fetch slot info for slot \"%s\" from primary: %s",
> /primary/the primary/
>
> errmsg("slot \"%s\" disappeared from the primary, aborting slot creation",
> errmsg("slot \"%s\" invalidated on primary, aborting slot creation",
>
> errmsg("slot-sync for slot %s interrupted by promotion, sync not possible",
> slot name not quoted?
>
> errmsg("skipping sync of slot \"%s\" as the received slot-sync lsn
> %X/%X is ahead of the standby position %X/%X",
>
> errmsg("not synchronizing slot %s; synchronization would move it backward",
> slot name not quoted?
> /backward/backwards/
>
> ---
>
> LOG
> errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d",
> errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d",
> errmsg("Stopping replication slot-sync worker %d",
> errmsg("waiting for remote slot \"%s\" LSN (%u/%X) and catalog xmin
> (%u) to pass local slot LSN (%u/%X) and and catalog xmin (%u)",
>
> errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)and catalog
> xmin (%u) has now passed local slot LSN (%X/%X) and catalog xmin
> (%u)",
> missing spaces?
>
> elog(LOG, "Dropped replication slot \"%s\" ",
> extra space?
> why this one is elog but others are not?
>
> elog(LOG, "Replication slot-sync worker %d is shutting down on
> receiving SIGINT", MySlotSyncWorker->slot);
> case?
> why this one is elog but others are not?
>
> elog(LOG, "Replication slot-sync worker %d started", worker_slot);
> case?
> why this one is elog but others are not?
> ----
>
> DEBUG1
> errmsg("allocated dsa for slot-sync worker for dbcount: %d"
> worker number not given?
> should be elog?
>
> errmsg_internal("logical replication launcher started")
> should be elog?
>
> ----
>
> DEBUG2
> elog(DEBUG2, "slot-sync worker%d's query:%s \n",
> missing space after 'worker'
> extra space before \n
>
> ======
> .../libpqwalreceiver/libpqwalreceiver.c
>
> 2. libpqrcv_get_dbname_from_conninfo
>
> +/*
> + * Get database name from primary conninfo.
> + *
> + * If dbanme is not found in connInfo, return NULL value.
> + * The caller should take care of handling NULL value.
> + */
> +static char *
> +libpqrcv_get_dbname_from_conninfo(const char *connInfo)
>
> 2a.
> /dbanme/dbname/
>
> ~
>
> 2b.
> "The caller should take care of handling NULL value."
>
> IMO this is not very useful; it's like saying "caller must handle
> function return values".
>
> ~~~
>
> 3.
> + for (opt = opts; opt->keyword != NULL; ++opt)
> + {
> + /* Ignore connection options that are not present. */
> + if (opt->val == NULL)
> + continue;
> +
> + if (strcmp(opt->keyword, "dbname") == 0 && opt->val[0] != '\0')
> + {
> + dbname = pstrdup(opt->val);
> + }
> + }
>
> 3a.
> If there are multiple "dbname" in the conninfo then it will be the
> LAST one that is returned.
>
> Judging by my quick syntax experiment (below) this seemed like the
> correct thing to do, but I think there should be some comment to
> explain about it.
>
> test_sub=# create subscription sub1 connection 'dbname=foo dbname=bar
> dbname=test_pub' publication pub1;
> 2023-09-28 19:15:15.012 AEST [23997] WARNING: subscriptions created
> by regression test cases should have names starting with "regress_"
> WARNING: subscriptions created by regression test cases should have
> names starting with "regress_"
> NOTICE: created replication slot "sub1" on publisher
> CREATE SUBSCRIPTION
>
> ~
>
> 3b.
> The block brackets {} are not needed for the single statement.
>
> ~
>
> 3c.
> Since there is only one keyword of interest here it seemed overkill to
> have a separate 'continue' check. Why not do everything in one line:
>
> for (opt = opts; opt->keyword != NULL; ++opt)
> {
> if (strcmp(opt->keyword, "dbname") == 0 && opt->val && opt->val[0] != '\0')
> dbname = pstrdup(opt->val);
> }
>
> ======
> src/backend/replication/logical/launcher.c
>
> 4.
> +/*
> + * The local variables to store the current values of slot-sync related GUCs
> + * before each ConfigReload.
> + */
> +static char *PrimaryConnInfoPreReload = NULL;
> +static char *PrimarySlotNamePreReload = NULL;
> +static char *SyncSlotNamesPreReload = NULL;
>
> /The local variables/Local variables/
>
> ~~~
>
> 5. fwd declare
>
> static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
> +static void slotsync_worker_cleanup(SlotSyncWorker *worker);
> static int logicalrep_pa_worker_count(Oid subid);
>
> 5a.
> Hmmn, I think there were lot more added static functions than just this one.
>
> e.g. what about all these?
> static SlotSyncWorker *slotsync_worker_find
> static dsa_handle slotsync_dsa_setup
> static bool slotsync_worker_launch_or_reuse
> static void slotsync_worker_stop_internal
> static void slotsync_workers_stop
> static void slotsync_remove_obsolete_dbs
> static WalReceiverConn *primary_connect
> static void SaveCurrentSlotSyncConfigs
> static bool SlotSyncConfigsChanged
> static void ApplyLauncherStartSlotSync
> static void ApplyLauncherStartSubs
>
> ~
>
> 5b.
> There are inconsistent name style used for the new static functions --
> e.g. snake_case versus CamelCase.
>
> ~~~
>
> 6. WaitForReplicationWorkerAttach
>
> int rc;
> + bool is_slotsync_worker = (lock == SlotSyncWorkerLock) ? true : false;
>
> This seemed a hacky way to distinguish the sync-slot workers from
> other kinds of workers. Wouldn't it be better to pass another
> parameter to this function?
>
> ~~~
>
> 7. slotsync_worker_attach
>
> It looks like almost a clone of the logicalrep_worker_attach. Seems a
> shame if cannot make use of common code.
>
> ~~~
>
> 8. slotsync_worker_find
>
> + * Walks the slot-sync workers pool and searches for one that matches given
> + * dbid. Since one worker can manage multiple dbs, so it walks the db array in
> + * each worker to find the match.
>
> 8a.
> SUGGESTION
> Searches the slot-sync worker pool for the worker who manages the
> specified dbid. Because a worker can manage multiple dbs, also walk
> the db array of each worker to find the match.
>
> ~
>
> 8b.
> Should the comment also say something like "Returns NULL if no
> matching worker is found."
>
> ~~~
>
> 9.
> + /* Search for attached worker for a given dbid */
>
> SUGGESTION
> Search for an attached worker managing the given dbid.
>
> ~~~
>
> 10.
> +{
> + int i;
> + SlotSyncWorker *res = NULL;
> + Oid *dbids;
> +
> + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED));
> +
> + /* Search for attached worker for a given dbid */
> + for (i = 0; i < max_slotsync_workers; i++)
> + {
> + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
> + int cnt;
> +
> + if (!w->hdr.in_use)
> + continue;
> +
> + dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp);
> + for (cnt = 0; cnt < w->dbcount; cnt++)
> + {
> + Oid wdbid = dbids[cnt];
> +
> + if (wdbid == dbid)
> + {
> + res = w;
> + break;
> + }
> + }
> +
> + /* If worker is found, break the outer loop */
> + if (res)
> + break;
> + }
> +
> + return res;
> +}
>
> IMO this logical can be simplified a lot:
> - by not using the 'res' variable; directly return instead.
> - also moved the 'dbids' declaration.
> - and 'cnt' variable seems not meaningful; replace with 'dbidx' for
> the db array index IMO.
>
> For example (25 lines instead of 35 lines)
>
> {
> int i;
>
> Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED));
>
> /* Search for an attached worker managing the given dbid. */
> for (i = 0; i < max_slotsync_workers; i++)
> {
> SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
> int dbidx;
> Oid *dbids;
>
> if (!w->hdr.in_use)
> continue;
>
> dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp);
> for (dbidx = 0; dbidx < w->dbcount; dbidx++)
> {
> if (dbids[dbidx] == dbid)
> return w;
> }
> }
>
> return NULL;
> }
>
> ~~~
>
> 11. slot_sync_dsa_setup
>
> +/*
> + * Setup DSA for slot-sync worker.
> + *
> + * DSA is needed for dbids array. Since max number of dbs a worker can manage
> + * is not known, so initially fixed size to hold DB_PER_WORKER_ALLOC_INIT
> + * dbs is allocated. If this size is exhausted, it can be extended using
> + * dsa free and allocate routines.
> + */
> +static dsa_handle
> +slotsync_dsa_setup(SlotSyncWorker *worker, int alloc_db_count)
>
> 11a.
> SUGGESTION
> DSA is used for the dbids array. Because the maximum number of dbs a
> worker can manage is not known, initially enough memory for
> DB_PER_WORKER_ALLOC_INIT dbs is allocated. If this size is exhausted,
> it can be extended using dsa free and allocate routines.
>
> ~
>
> 11b.
> It doesn't make sense for the comment to say DB_PER_WORKER_ALLOC_INIT
> is the initial allocation, but then the function has a parameter
> 'alloc_db_count' (which is always passed as DB_PER_WORKER_ALLOC_INIT).
> IMO revemo the 2nd parameter from this function and hardwire the
> initial allocation same as what the function comment says.
>
> ~~~
>
> 12.
> + /* Be sure any memory allocated by DSA routines is persistent. */
> + oldcontext = MemoryContextSwitchTo(TopMemoryContext);
>
> /Be sure any memory/Ensure the memory/
>
> ~~~
>
> 13. slotsync_worker_launch_or_reuse
>
> +/*
> + * Slot-sync worker launch or reuse
> + *
> + * Start new slot-sync background worker from the pool of available workers
> + * going by max_slotsync_workers count. If the worker pool is exhausted,
> + * reuse the existing worker with minimum number of dbs. The idea is to
> + * always distribute the dbs equally among launched workers.
> + * If initially allocated dbids array is exhausted for the selected worker,
> + * reallocate the dbids array with increased size and copy the existing
> + * dbids to it and assign the new one as well.
> + *
> + * Returns true on success, false on failure.
> + */
>
> /going by/limited by/ (??)
>
> ~~~
>
> 14.
> + BackgroundWorker bgw;
> + BackgroundWorkerHandle *bgw_handle;
> + uint16 generation;
> + SlotSyncWorker *worker = NULL;
> + uint32 mindbcnt = 0;
> + uint32 alloc_count = 0;
> + uint32 copied_dbcnt = 0;
> + Oid *copied_dbids = NULL;
> + int worker_slot = -1;
> + dsa_handle handle;
> + Oid *dbids;
> + int i;
> + bool attach;
>
> IIUC many of these variables can be declared at a different scope in
> this function, so they will be closer to where they are used.
>
> ~~~
>
> 15.
> + /*
> + * We need to do the modification of the shared memory under lock so that
> + * we have consistent view.
> + */
> + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
>
> The current comment seems too much.
>
> SUGGESTION
> The shared memory must only be modified under lock.
>
> ~~~
>
> 16.
> + /* Find unused worker slot. */
> + for (i = 0; i < max_slotsync_workers; i++)
> + {
> + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
> +
> + if (!w->hdr.in_use)
> + {
> + worker = w;
> + worker_slot = i;
> + break;
> + }
> + }
> +
> + /*
> + * If all the workers are currently in use. Find the one with minimum
> + * number of dbs and use that.
> + */
> + if (!worker)
> + {
> + for (i = 0; i < max_slotsync_workers; i++)
> + {
> + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
> +
> + if (i == 0)
> + {
> + mindbcnt = w->dbcount;
> + worker = w;
> + worker_slot = i;
> + }
> + else if (w->dbcount < mindbcnt)
> + {
> + mindbcnt = w->dbcount;
> + worker = w;
> + worker_slot = i;
> + }
> + }
> + }
>
> Why not combine these 2 loops, to avoid iterating over the same slots
> twice? Then, exit the loop immediately if unused worker found,
> otherwise if reach the end of loop having not found anything unused
> then you will already know the one having least dbs.
>
> ~~~
>
> 17.
> + /* Remember the old dbids before we reallocate dsa. */
> + copied_dbcnt = worker->dbcount;
> + copied_dbids = (Oid *) palloc0(worker->dbcount * sizeof(Oid));
> + memcpy(copied_dbids, dbids, worker->dbcount * sizeof(Oid));
>
> 17a.
> Who frees this copied_dbids memory when you are finished needed it. It
> seems allocated in the TopMemoryContext so IIUC this is a leak.
>
> ~
>
> 17b.
> These are the 'old' values. Not the 'copied' values. The copied_xxx
> variable names seem misleading.
>
> ~~~
>
> 18.
> + /* Prepare the new worker. */
> + worker->hdr.launch_time = GetCurrentTimestamp();
> + worker->hdr.in_use = true;
>
> If a new worker is required then the launch_time is set like above.
>
> + {
> + slot_db_data->last_launch_time = now;
> +
> + slotsync_worker_launch_or_reuse(slot_db_data->database);
> + }
>
> Meanwhile, at the caller of slotsync_worker_launch_or_reuse(), the
> dbid launch_time was already set as well. And those two timestamps are
> almost (but not quite) the same value. Isn't that a bit strange?
>
> ~~~
>
> 19.
> + /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */
> + handle = slotsync_dsa_setup(worker, DB_PER_WORKER_ALLOC_INIT);
> + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp);
> +
> + dbids[worker->dbcount++] = dbid;
>
> Where was this worker->dbcount assigned to 0?
>
> Maybe it's better to do this explicity under the "/* Prepare the new
> worker. */" comment.
>
> ~~~
>
> 20.
> + if (!attach)
> + ereport(WARNING,
> + (errmsg("Replication slot-sync worker failed to attach to "
> + "worker-pool slot %d", worker_slot)));
> +
> + /* Attach is done, now safe to log that the worker is managing dbid */
> + if (attach)
> + ereport(LOG,
> + (errmsg("Added database %d to replication slot-sync "
> + "worker %d; dbcount now: %d",
> + dbid, worker_slot, worker->dbcount)));
>
> 20a.
> IMO this should be coded as "if (attach) ...; else ..."
>
> ~
>
> 99b.
> In other code if it failed to register then slotsync_worker_cleanup
> code is called. How come similar code is not done when fails to
> attach?
>
> ~~~
>
> 21. slotsync_worker_stop_internal
>
> +/*
> + * Internal function to stop the slot-sync worker and wait until it detaches
> + * from the slot-sync worker-pool slot.
> + */
> +static void
> +slotsync_worker_stop_internal(SlotSyncWorker *worker)
>
> IIUC this function does a bit more than what the function comment
> says. IIUC (again) I think the "detached" worker slot will still be
> flagged as 'inUse' but this function then does the extra step of
> calling slotsync_worker_cleanup() function to make the worker slot
> available for next process that needs it, am I correct?
>
> In this regard, this function seems a lot more like
> logicalrep_worker_detach() function comment, so there seems some kind
> of muddling of the different function names here... (??).
>
> ~~~
>
> 22. slotsync_remove_obsolete_dbs
>
> This function says:
> +/*
> + * Slot-sync workers remove obsolete DBs from db-list
> + *
> + * If the DBIds fetched from the primary are lesser than the ones being managed
> + * by slot-sync workers, remove extra dbs from worker's db-list. This
> may happen
> + * if some slots are removed on primary but 'synchronize_slot_names' has not
> + * been changed yet.
> + */
> +static void
> +slotsync_remove_obsolete_dbs(List *remote_dbs)
>
> But, there was another similar logic function too:
>
> +/*
> + * Drop obsolete slots
> + *
> + * Drop the slots which no longer need to be synced i.e. these either
> + * do not exist on primary or are no longer part of synchronize_slot_names.
> + *
> + * Also drop the slots which are valid on primary and got invalidated
> + * on standby due to conflict (say required rows removed on primary).
> + * The assumption is, these will get recreated in next sync-cycle and
> + * it is okay to drop and recreate such slots as long as these are not
> + * consumable on standby (which is the case currently).
> + */
> +static void
> +drop_obsolete_slots(Oid *dbids, List *remote_slot_list)
>
> Those function header comments suggest these have a lot of overlapping
> functionality.
>
> Can't those 2 functions be combined? Or maybe one delegate to the other?
>
> ~~~
>
> 23.
> + ListCell *lc;
> + Oid *dbids;
> + int widx;
> + int dbidx;
> + int i;
>
> Scope of some of these variable declarations can be different so they
> are declared closer to where they are used.
>
> ~~~
>
> 24.
> + /* If not found, then delete this db from worker's db-list */
> + if (!found)
> + {
> + for (i = dbidx; i < worker->dbcount; i++)
> + {
> + /* Shift the DBs and get rid of wdbid */
> + if (i < (worker->dbcount - 1))
> + dbids[i] = dbids[i + 1];
> + }
>
> IIUC, that shift/loop could just have been a memmove() call to remove
> one Oid element.
>
> ~~~
>
> 25.
> + /* If dbcount for any worker has become 0, shut it down */
> + for (widx = 0; widx < max_slotsync_workers; widx++)
> + {
> + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx];
> +
> + if (worker->hdr.in_use && !worker->dbcount)
> + slotsync_worker_stop_internal(worker);
> + }
>
> Is it safe to stop this unguarded by SlotSyncWorkerLock locking? Is
> there a window where another dbid decides to reuse this worker at the
> same time this process is about to stop it?
>
> ~~~
>
> 26. primary_connect
>
> +/*
> + * Connect to primary server for slotsync purpose and return the connection
> + * info. Disconnect previous connection if provided in wrconn_prev.
> + */
>
> /primary server/the primary server/
>
> ~~~
>
> 27.
> + if (!RecoveryInProgress())
> + return NULL;
> +
> + if (max_slotsync_workers == 0)
> + return NULL;
> +
> + if (strcmp(synchronize_slot_names, "") == 0)
> + return NULL;
> +
> + /* The primary_slot_name is not set */
> + if (!WalRcv || WalRcv->slotname[0] == '\0')
> + {
> + ereport(WARNING,
> + errmsg("Skipping slots synchronization as primary_slot_name "
> + "is not set."));
> + return NULL;
> + }
> +
> + /* The hot_standby_feedback must be ON for slot-sync to work */
> + if (!hot_standby_feedback)
> + {
> + ereport(WARNING,
> + errmsg("Skipping slots synchronization as hot_standby_feedback "
> + "is off."));
> + return NULL;
> + }
>
> How come some of these checks giving WARNING that slot synchronization
> will be skipped, but others are just silently returning NULL?
>
> ~~~
>
> 28. SaveCurrentSlotSyncConfigs
>
> +static void
> +SaveCurrentSlotSyncConfigs()
> +{
> + PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo);
> + PrimarySlotNamePreReload = pstrdup(WalRcv->slotname);
> + SyncSlotNamesPreReload = pstrdup(synchronize_slot_names);
> +}
>
> Shouldn't this code also do pfree first? Otherwise these will slowly
> leak every time this function is called, right?
>
> ~~~
>
> 29. SlotSyncConfigsChanged
>
> +static bool
> +SlotSyncConfigsChanged()
> +{
> + if (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0)
> + return true;
> +
> + if (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0)
> + return true;
> +
> + if (strcmp(SyncSlotNamesPreReload, synchronize_slot_names) != 0)
> + return true;
>
> I felt those can all be combined to have 1 return instead of 3.
>
> ~~~
>
> 30.
> + /*
> + * If we have reached this stage, it means original value of
> + * hot_standby_feedback was 'true', so consider it changed if 'false' now.
> + */
> + if (!hot_standby_feedback)
> + return true;
>
> "If we have reached this stage" seems a bit vague. Can this have some
> more explanation? And, maybe also an Assert(hot_standby_feedback); is
> helpful in the calling code (before the config is reloaded)?
>
> ~~~
>
> 31. ApplyLauncherStartSlotSync
>
> + * It connects to primary, get the list of DBIDs for slots configured in
> + * synchronize_slot_names. It then launces the slot-sync workers as per
> + * max_slotsync_workers and then assign the DBs equally to the workers
> + * launched.
> + */
>
> SUGGESTION (fix typos etc)
> Connect to the primary, to get the list of DBIDs for slots configured
> in synchronize_slot_names. Then launch slot-sync workers (limited by
> max_slotsync_workers) where the DBs are distributed equally among
> those workers.
>
> ~~~
>
> 32.
> +static void
> +ApplyLauncherStartSlotSync(long *wait_time, WalReceiverConn *wrconn)
>
> Why does this function even have 'Apply' in the name when it is
> nothing to do with an apply worker; looks like some cut/paste
> hangover. How about calling it something like 'LaunchSlotSyncWorkers'
>
> ~~~
>
> 33.
> + /* If connection is NULL due to lack of correct configurations, return */
> + if (!wrconn)
> + return;
>
> IMO it would be better to Assert wrconn in this function. If it is
> NULL then it should be checked a the caller, otherwise it just raises
> more questions -- like "who logged the warning about bad
> configuration" etc (which I already questions the NULL returns of
> primary_connect.
>
> ~~~
>
> 34.
> + if (!OidIsValid(slot_db_data->database))
> + continue;
>
> This represents some kind of integrity error doesn't it? Is it really
> OK just to silently skip such a thing?
>
> ~~~
>
> 35.
> + /*
> + * If the worker is eligible to start now, launch it. Otherwise,
> + * adjust wait_time so that we'll wake up as soon as it can be
> + * started.
> + *
> + * Each apply worker can only be restarted once per
> + * wal_retrieve_retry_interval, so that errors do not cause us to
> + * repeatedly restart the worker as fast as possible.
> + */
>
> 35a.
> I found the "we" part of "so that we'll wake up..." to be a bit
> misleading. There is no waiting in this function; that wait value is
> handed back to the caller to deal with. TBH, I did not really
> understand why it is even necessary tp separate the waiting
> calculation *per-worker* like this. It seems to overcomplicate things
> and it might even give results like 1st worker is not started but last
> works is started (if enough time elapsed in the loop). Why can't all
> this wait logic be done one time up front, and either (a) start all
> necessary workers, or (b) start none of them and wait a bit longer.
>
> ~
>
> 35b.
> "Each apply worker". Why is this talking about "apply" workers? Maybe
> cut/paste error?
>
> ~~~
>
> 36.
> + last_launch_tried = slot_db_data->last_launch_time;
> + now = GetCurrentTimestamp();
> + if (last_launch_tried == 0 ||
> + (elapsed = TimestampDifferenceMilliseconds(last_launch_tried, now)) >=
> + wal_retrieve_retry_interval)
> + {
> + slot_db_data->last_launch_time = now;
> +
> + slotsync_worker_launch_or_reuse(slot_db_data->database);
> + }
> + else
> + {
> + *wait_time = Min(*wait_time,
> + wal_retrieve_retry_interval - elapsed);
> + }
>
> 36a.
> IMO this might be simpler if you add another variable like bool 'launch_now':
>
> last_launch_tried = ...
> now = ...
> elapsed = ...
> launch_now = elapsed >= wal_retrieve_retry_interval;
>
> ~
>
> 36b.
> Do you really care about checking "last_launch_tried == 0"; If it
> really is zero, then I thought the elapsed check should be enough.
>
> ~
>
> 36c.
> Does this 'last_launch_time' really need to be in some shared memory?
> Won't a static variable suffice?
>
>
> ~~~
>
> 37. ApplyLauncherStartSubs
>
> Wouldn't a better name for the function be something like
> 'LaunchSubscriptionApplyWorker'? (it is a better match for the
> suggested LaunchSlotSyncWorkers)
>
>
> ~~~
>
> 38. ApplyLauncherMain
>
> Now that this is not only for Apply worker but also for SlotSync
> workers, maybe this function should be renamed as just LauncherMain,
> or something equally generic?
>
> ~~~
>
> 39.
> + load_file("libpqwalreceiver", false);
> +
> + wrconn = primary_connect(NULL);
> +
>
> This connection did not exist in the HEAD code so I think it is added
> only for the slot-sync logic. IIUC it is still doing nothing for the
> non-slot-sync cases because primary_connect will silently return in
> that case:
>
> + if (!RecoveryInProgress())
> + return NULL;
>
> IMO this is too sneaky, and it is misleading to see the normal apply
> worker launch apparently ccnnecting to something when it is not really
> doing so AFAIK. I think these conditions should be done explicity here
> at the caller to remove any such ambiguity.
>
> ~~~
>
> 40.
> + if (!RecoveryInProgress())
> + ApplyLauncherStartSubs(&wait_time);
> + else
> + ApplyLauncherStartSlotSync(&wait_time, wrconn);
>
> 40a.
> IMO this is deserving of a comment to explain why RecoveryInProgress
> means to perform the slot-synchronization.
>
> ~
>
> 40b.
> Also, better to have positive check RecoveryInProgress() instead of
> !RecoveryInProgress()
>
> ~~~
>
> 41.
> if (ConfigReloadPending)
> {
> + bool ssConfigChanged = false;
> +
> + SaveCurrentSlotSyncConfigs();
> +
> ConfigReloadPending = false;
> ProcessConfigFile(PGC_SIGHUP);
> +
> + /*
> + * Stop the slot-sync workers if any of the related GUCs changed.
> + * These will be relaunched as per the new values during next
> + * sync-cycle.
> + */
> + ssConfigChanged = SlotSyncConfigsChanged();
> + if (ssConfigChanged)
> + slotsync_workers_stop();
> +
> + /* Reconnect in case primary_conninfo has changed */
> + wrconn = primary_connect(wrconn);
> }
> }
>
> ~
>
> 41a.
> The 'ssConfigChanged' assignement at declaration is not needed.
> Indeed, the whole variable is not really necessary because it is used
> only once.
>
> ~
>
> 41b.
> /as per the new values/using the new values/
>
> ~
>
> 41c.
> + /* Reconnect in case primary_conninfo has changed */
> + wrconn = primary_connect(wrconn);
>
> To avoid unnecessary reconnections, shouldn't this be done only if
> (ssConfigChanged).
>
> In fact, assuming the comment is correct, reconnect only if
> (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0)
>
>
> ======
> src/backend/replication/logical/slotsync.c
>
> 42. wait_for_primary_slot_catchup
>
> + ereport(LOG,
> + errmsg("waiting for remote slot \"%s\" LSN (%u/%X) and catalog xmin"
> + " (%u) to pass local slot LSN (%u/%X) and and catalog xmin (%u)",
> + remote_slot->name,
> + LSN_FORMAT_ARGS(remote_slot->restart_lsn),
> + remote_slot->catalog_xmin,
> + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
> + MyReplicationSlot->data.catalog_xmin));
>
> AFAIK it is usual for the LSN format string to be %X/%X (not %u/%X like here).
>
> ~~~
>
> 43.
> + appendStringInfo(&cmd,
> + "SELECT restart_lsn, confirmed_flush_lsn, catalog_xmin"
> + " FROM pg_catalog.pg_replication_slots"
> + " WHERE slot_name = %s",
> + quote_literal_cstr(remote_slot->name));
>
> double space before FROM?
>
> ~~~
>
> 44. synchronize_one_slot
>
> + /*
> + * We might not have the WALs retained locally corresponding to
> + * remote's restart_lsn if our local restart_lsn and/or local
> + * catalog_xmin is ahead of remote's one. And thus we can not create
> + * the local slot in sync with primary as that would mean moving local
> + * slot backward. Thus wait for primary's restart_lsn and catalog_xmin
> + * to catch up with the local ones and then do the sync.
> + */
> + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
> + TransactionIdPrecedes(remote_slot->catalog_xmin,
> + MyReplicationSlot->data.catalog_xmin))
> + {
> + if (!wait_for_primary_slot_catchup(wrconn, remote_slot))
> + {
> + /*
> + * The remote slot didn't catch up to locally reserved
> + * position
> + */
> + ReplicationSlotRelease();
> + CommitTransactionCommand();
> + return;
> + }
>
>
> SUGGESTION (comment is slightly simplified)
> If the local restart_lsn and/or local catalog_xmin is ahead of those
> on the remote then we cannot create the local slot in sync with
> primary because that would mean moving local slot backwards. In this
> case we will wait for primary's restart_lsn and catalog_xmin to catch
> up with the local one before attempting the sync.
>
> ======
> Kind Regards,
> Peter Smith.
> Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Ashutosh Bapat 2023-10-09 10:35:34 Re: Draft LIMIT pushdown to Append and MergeAppend patch
Previous Message shveta malik 2023-10-09 10:30:12 Re: Synchronizing slots from primary to standby