Re: Time delayed LR (WAS Re: logical replication restrictions)

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "Takamichi Osumi (Fujitsu)" <osumi(dot)takamichi(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Kyotaro Horiguchi <horikyota(dot)ntt(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "vignesh21(at)gmail(dot)com" <vignesh21(at)gmail(dot)com>, "shveta(dot)malik(at)gmail(dot)com" <shveta(dot)malik(at)gmail(dot)com>, "dilipbalaut(at)gmail(dot)com" <dilipbalaut(at)gmail(dot)com>, "euler(at)eulerto(dot)com" <euler(at)eulerto(dot)com>, "m(dot)melihmutlu(at)gmail(dot)com" <m(dot)melihmutlu(at)gmail(dot)com>, "andres(at)anarazel(dot)de" <andres(at)anarazel(dot)de>, "marcos(at)f10(dot)com(dot)br" <marcos(at)f10(dot)com(dot)br>, "pgsql-hackers(at)postgresql(dot)org" <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Time delayed LR (WAS Re: logical replication restrictions)
Date: 2023-02-13 08:52:49
Message-ID: CAHut+Punm+_Vq7aosEuDj17Kny9Td636HE7R2-LaDZ2+FCrGpg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are my review comments for the v34 patch.

======
src/backend/replication/logical/worker.c

+/* The last time we send a feedback message */
+static TimestampTz send_time = 0;
+

IMO this is a bad variable name. When this variable was changed to be
global it ought to have been renamed.

The name "send_time" is almost meaningless without any contextual information.

But also it's bad because this global name is "shadowed" by several
other parameters and other local variables using that same name (e.g.
see UpdateWorkerStats, LogicalRepApplyLoop, etc). It is too confusing.

How about using a unique/meaningful name with a comment to match to
improve readability and remove unwanted shadowing?

SUGGESTION
/* Timestamp of when the last feedback message was sent. */
static TimestampTz last_sent_feedback_ts = 0;

~~~

2. maybe_apply_delay

+ /* Apply the delay by the latch mechanism */
+ do
+ {
+ TimestampTz delayUntil;
+ long diffms;
+
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* This might change wal_receiver_status_interval */
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /*
+ * Before calculating the time duration, reload the catalog if needed.
+ */
+ if (!in_remote_transaction && !in_streamed_transaction)
+ {
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+ }
+
+ delayUntil = TimestampTzPlusMilliseconds(finish_ts,
MySubscription->minapplydelay);
+ diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), delayUntil);
+
+ /*
+ * Exit without arming the latch if it's already past time to apply
+ * this transaction.
+ */
+ if (diffms <= 0)
+ break;
+
+ elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay
= %d ms, remaining wait time: %ld ms",
+ xid, MySubscription->minapplydelay, diffms);
+
+ /*
+ * Call send_feedback() to prevent the publisher from exiting by
+ * timeout during the delay, when the status interval is greater than
+ * zero.
+ */
+ if (!status_interval_ms)
+ {
+ TimestampTz nextFeedback;
+
+ /*
+ * Based on the last time when we send a feedback message, adjust
+ * the first delay time for this transaction. This ensures that
+ * the first feedback message follows wal_receiver_status_interval
+ * interval.
+ */
+ nextFeedback = TimestampTzPlusMilliseconds(send_time,
+ wal_receiver_status_interval * 1000L);
+ status_interval_ms =
TimestampDifferenceMilliseconds(GetCurrentTimestamp(), nextFeedback);
+ }
+ else
+ status_interval_ms = wal_receiver_status_interval * 1000L;
+
+ if (status_interval_ms > 0 && diffms > status_interval_ms)
+ {
+ WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ status_interval_ms,
+ WAIT_EVENT_LOGICAL_APPLY_DELAY);
+ send_feedback(last_received, true, false, true);
+ }
+ else
+ WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ diffms,
+ WAIT_EVENT_LOGICAL_APPLY_DELAY);
+
+ } while (true);

~

IMO this logic has been tweaked too many times without revisiting the
variable names and logic from scratch, so it has become over-complex
- some variable names are assuming multiple meanings
- multiple * 1000L have crept back in again
- the 'diffms' is too generic now with so many vars so it has lost its meaning
- GetCurrentTimestamp call in multiple places

SUGGESTIONS
- rename some variables and simplify the logic.
- reduce all the if/else
- don't be sneaky with the meaning of status_interval_ms
- 'diffms' --> 'remaining_delay_ms'
- 'DelayUntil' --> 'delay_until_ts'
- introduce 'now' variable
- simplify the check of (next_feedback_due_ms < remaining_delay_ms)

SUGGESTION (WFM)

/* Apply the delay by the latch mechanism */
while (true)
{
TimestampTz now;
TimestampTz delay_until_ts;
long remaining_delay_ms;
long status_interval_ms;

ResetLatch(MyLatch);

CHECK_FOR_INTERRUPTS();

/* This might change wal_receiver_status_interval */
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}

/*
* Before calculating the time duration, reload the catalog if needed.
*/
if (!in_remote_transaction && !in_streamed_transaction)
{
AcceptInvalidationMessages();
maybe_reread_subscription();
}

now = GetCurrentTimestamp();
delay_until_ts = TimestampTzPlusMilliseconds(finish_ts,
MySubscription->minapplydelay);
remaining_delay_ms = TimestampDifferenceMilliseconds(now, delay_until_ts);

/*
* Exit without arming the latch if it's already past time to apply
* this transaction.
*/
if (remaining_delay_ms <= 0)
break;

elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay =
%d ms, remaining wait time: %ld ms",
xid, MySubscription->minapplydelay, remaining_delay_ms);
/*
* If a status interval is defined then we may need to call send_feedback()
* early to prevent the publisher from exiting during a long apply delay.
*/
status_interval_ms = wal_receiver_status_interval * 1000L;
if (status_interval_ms > 0)
{
TimestampTz next_feedback_due_ts;
long next_feedback_due_ms;

/*
* Find if the next feedback is due earlier than the remaining delay ms.
*/
next_feedback_due_ts = TimestampTzPlusMilliseconds(send_time,
status_interval_ms);
next_feedback_due_ms = TimestampDifferenceMilliseconds(now,
next_feedback_due_ts);
if (next_feedback_due_ms < remaining_delay_ms)
{
/* delay before feedback */
WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
next_feedback_due_ms,
WAIT_EVENT_LOGICAL_APPLY_DELAY);
send_feedback(last_received, true, false, true);
continue;
}
}

/* delay before apply */
WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
remaining_delay_ms,
WAIT_EVENT_LOGICAL_APPLY_DELAY);
}

======
src/include/utils/wait_event.h

3.
@@ -149,7 +149,8 @@ typedef enum
WAIT_EVENT_REGISTER_SYNC_REQUEST,
WAIT_EVENT_SPIN_DELAY,
WAIT_EVENT_VACUUM_DELAY,
- WAIT_EVENT_VACUUM_TRUNCATE
+ WAIT_EVENT_VACUUM_TRUNCATE,
+ WAIT_EVENT_LOGICAL_APPLY_DELAY
} WaitEventTimeout;

FYI - The PGDOCS has a section with "Table 28.13. Wait Events of Type
Timeout" so if you a going to add a new Timeout Event then you also
need to document it (alphabetically) in that table.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Drouvot, Bertrand 2023-02-13 08:58:52 Re: Reconcile stats in find_tabstat_entry() and get rid of PgStat_BackendFunctionEntry
Previous Message Amit Kapila 2023-02-13 08:36:57 Re: Rework LogicalOutputPluginWriterUpdateProgress