Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-04-22 04:12:17
Message-ID: CAHut+PuqYP5eD5wcSCtk=a6KuMjat2UCzqyGoE7sieCaBsVskQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hello Hou-san. Here are my review comments for v4-0001. Sorry, there
are so many of them (it is a big patch); some are trivial, and others
you might easily dismiss due to my misunderstanding of the code. But
hopefully, there are at least some comments that can be helpful in
improving the patch quality.

======

1. General comment - terms

Needs to be more consistent about what exactly you will call this new
worker. Sometimes called "locally apply worker"; sometimes "bgworker";
sometimes "subworker", sometimes "BGW", sometimes other variations etc
… Need to pick ONE good name then update all the references/comments
in the patch to use that name consistently throughout.

~~~

2. General comment - option values

I felt the "streaming" option values ought to be different from what
this patch proposes so it affected some of my following review
comments. (Later I give example what I thought the values should be).

~~~

3. General comment - bool option change to enum

This option change for "streaming" is similar to the options change
for "copy_data=force" that Vignesh is doing for his "infinite
recursion" patch v9-0002 [1]. Yet they seem implemented differently
(i.e. char versus enum). I think you should discuss the 2 approaches
with Vignesh and then code these option changes in a consistent way.

~~~

4. General comment - worker.c globals

There seems a growing number of global variables in the worker.c code.
I was wondering is it really necessary? because the logic becomes more
intricate now if you have to know that some global was set up as a
side-effect of some other function call. E.g maybe if you could do a
few more HTAB lookups to identify the bgworker then might not need to
rely on the globals so much?

======

5. Commit message - typo

and then on the subscriber-side, the apply worker writes the changes into
temporary files and once it receives the commit, it read from the file and
apply the entire transaction. To improve the performance of such transactions,

typo: "read" -> "reads"
typo: "apply" -> "applies"

~~~

6. Commit message - wording

In this approach, we assign a new bgworker (if available) as soon as the xact's
first stream came and the main apply worker will send changes to this new
worker via shared memory. The bgworker will directly apply the change instead
of writing it to temporary files. We keep this worker assigned till the
transaction commit came and also wait for the worker to finish at commit. This

wording: "came" -> "is received" (2x)

~~~

7. Commit message - terms

(this is the same point as comment #1)

I think there is too much changing of terminology. IMO it will be
easier if you always just call the current main apply workers the
"apply worker" and always call this new worker the "bgworker" (or some
better name). But never just call it the "worker".

~~~

8. Commit message - typo

transaction commit came and also wait for the worker to finish at commit. This
preserves commit ordering and avoid writing to and reading from file in most
cases. We still need to spill if there is no worker available. We also need to

typo: "avoid" -> "avoids"

~~~

9. Commit message - wording/typo

Also extend the subscription streaming option so that user can control whether
apply the streaming transaction in a bgworker or spill the change to disk. User

wording: "Also extend" -> "This patch also extends"
typo: "whether apply" -> "whether to apply"

~~~

10. Commit message - option values

apply the streaming transaction in a bgworker or spill the change to disk. User
can set the streaming option to 'on/off', 'apply', 'spool'. For now, 'on' and

Those values do not really seem intuitive to me. E.g. if you set
"apply" then you already said above that sometimes it might have to
spool anyway if there were no bgworkers available. Why not just name
them like "on/off/parallel"?

(I have written more about this in a later comment #14)

======

11. doc/src/sgml/catalogs.sgml - wording

+ Controls in which modes we handle the streaming of in-progress
transactions.
+ <literal>f</literal> = disallow streaming of in-progress transactions

wording: "Controls in which modes we handle..." -> "Controls how to handle..."

~~~

12. doc/src/sgml/catalogs.sgml - wording

+ <literal>a</literal> = apply changes directly in background worker

wording: "in background worker" -> "using a background worker"

~~~

13. doc/src/sgml/catalogs.sgml - option values

Anyway, all this page will be different if I can persuade you to
change the option values (see comment #14)

======

14. doc/src/sgml/ref/create_subscription.sgml - option values

Since the default value is "off" I felt these options would be
better/simpler if they are just like "off/on/parallel". E.g.
Specifically, I think the "on" should behave the same as the current
code does, so the user should deliberately choose to use this new
bgworker approach.

e.g.
- "off" = off, same as current PG15
- "on" = on, same as current PG15
- "parallel" = try to use the new bgworker to apply stream

======

15. src/backend/commands/subscriptioncmds.c - SubOpts

Vignesh uses similar code for his "infinite recursion" patch being
developed [1] but he used an enum but here you use a char. I think you
should discuss together both decide to use either enum or char for the
member so there is a consistency.

~~~

16. src/backend/commands/subscriptioncmds.c - combine conditions

+ /*
+ * The set of strings accepted here should match up with the
+ * grammar's opt_boolean_or_string production.
+ */
+ if (pg_strcasecmp(sval, "true") == 0)
+ return SUBSTREAM_APPLY;
+ if (pg_strcasecmp(sval, "false") == 0)
+ return SUBSTREAM_OFF;
+ if (pg_strcasecmp(sval, "on") == 0)
+ return SUBSTREAM_APPLY;
+ if (pg_strcasecmp(sval, "off") == 0)
+ return SUBSTREAM_OFF;
+ if (pg_strcasecmp(sval, "spool") == 0)
+ return SUBSTREAM_SPOOL;
+ if (pg_strcasecmp(sval, "apply") == 0)
+ return SUBSTREAM_APPLY;

Because I think the possible option values should be different to
these I can’t comment much on this code, except to suggest IMO the if
conditions should be combined where the options are considered to be
equivalent.

======

17. src/backend/replication/logical/launcher.c - stop_worker

@@ -72,6 +72,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
+static void stop_worker(LogicalRepWorker *worker);

The function name does not seem consistent with the other similar static funcs.

~~~

18. src/backend/replication/logical/launcher.c - change if

@@ -225,7 +226,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool
only_running)
LogicalRepWorker *w = &LogicalRepCtx->workers[i];

if (w->in_use && w->subid == subid && w->relid == relid &&
- (!only_running || w->proc))
+ (!only_running || w->proc) && !w->subworker)
{
Maybe code would be easier (and then you can comment it) if you do like:

/* TODO: comment here */
if (w->subworker)
continue;

~~~

19. src/backend/replication/logical/launcher.c -
logicalrep_worker_launch comment

@@ -262,9 +263,9 @@ logicalrep_workers_find(Oid subid, bool only_running)
/*
* Start new apply background worker, if possible.
*/
-void
+bool
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid)
+ Oid relid, dsm_handle subworker_dsm)

Saying "start new apply..." comment feels a bit misleading. E.g. this
is also called to start the sync worker. And also for the main apply
worker (which we are not really calling a "background worker" in other
places). So this is the same kind of terminology problem as my review
comment #1.

~~~

20. src/backend/replication/logical/launcher.c - asserts?

I thought maybe there should be some assertions in this code upfront.
E.g. cannot have OidIsValid(relid) and subworker_dsm valid at the same
time.

~~~

21. src/backend/replication/logical/launcher.c - terms

+ else
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication apply worker for subscription %u", subid);

I think the names of all these workers is a bit vague still in the
messages – e.g. "logical replication worker" versus "logical
replication apply worker" sounds too similar to me. So this is kind of
same as my review comment #1.

~~~

22. src/backend/replication/logical/launcher.c -
logicalrep_worker_stop double unlock?

@@ -450,6 +465,18 @@ logicalrep_worker_stop(Oid subid, Oid relid)
return;
}

+ stop_worker(worker);
+
+ LWLockRelease(LogicalRepWorkerLock);
+}

IIUC, sometimes it seems that stop_worker() function might already
release the lock before it returns. In that case won’t this other
explicit lock release be a problem?

~~~

23. src/backend/replication/logical/launcher.c - logicalrep_worker_detach

@@ -600,6 +625,28 @@ logicalrep_worker_attach(int slot)
static void
logicalrep_worker_detach(void)
{
+ /*
+ * If we are the main apply worker, stop all the sub apply workers we
+ * started before.
+ */
+ if (!MyLogicalRepWorker->subworker)
+ {
+ List *workers;
+ ListCell *lc;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
+ foreach(lc, workers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->subworker)
+ stop_worker(w);
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);

Can this have the same double-unlock problem as I described in the
previous review comment #22?

~~~

24. src/backend/replication/logical/launcher.c - ApplyLauncherMain

@@ -869,7 +917,7 @@ ApplyLauncherMain(Datum main_arg)
wait_time = wal_retrieve_retry_interval;

logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid);
+ sub->owner, InvalidOid, DSM_HANDLE_INVALID);
}
Now that the logicalrep_worker_launch is retuning a bool, should this
call be checking the return value and taking appropriate action if it
failed?

======

25. src/backend/replication/logical/origin.c - acquire comment

+ /*
+ * We allow the apply worker to get the slot which is acquired by its
+ * leader process.
+ */
+ else if (curstate->acquired_by != 0 && acquire)

The comment was not very clear to me. Does the term "apply worker" in
the comment make sense, or should that say "bgworker"? This might be
another example of my review comment #1.

~~~

26. src/backend/replication/logical/origin.c - acquire code

+ /*
+ * We allow the apply worker to get the slot which is acquired by its
+ * leader process.
+ */
+ else if (curstate->acquired_by != 0 && acquire)
{
ereport(ERROR,

I somehow felt that this param would be better called 'skip_acquire',
so all the callers would have to use the opposite boolean and then
this code would say like below (which seemed easier to me). YMMV.

else if (curstate->acquired_by != 0 && !skip_acquire)
{
ereport(ERROR,

=====

27. src/backend/replication/logical/tablesync.c

@@ -568,7 +568,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
- rstate->relid);
+ rstate->relid,
+ DSM_HANDLE_INVALID);
hentry->last_start_time = now;
Now that the logicalrep_worker_launch is returning a bool, should this
call be checking that the launch was successful before it changes the
last_start_time?

======

28. src/backend/replication/logical/worker.c - file comment

+ * 1) Separate background workers
+ *
+ * Assign a new bgworker (if available) as soon as the xact's first stream came
+ * and the main apply worker will send changes to this new worker via shared
+ * memory. We keep this worker assigned till the transaction commit came and
+ * also wait for the worker to finish at commit. This preserves commit ordering
+ * and avoid writing to and reading from file in most cases. We still need to
+ * spill if there is no worker available. We also need to allow stream_stop to
+ * complete by the background worker to finish it to avoid deadlocks because
+ * T-1's current stream of changes can update rows in conflicting order with
+ * T-2's next stream of changes.

This comment fragment looks the same as the commit message so the
typos/wording reported already for the commit message are applicable
here too.

~~~

29. src/backend/replication/logical/worker.c - file comment

+ * If no worker is available to handle streamed transaction, we write the data
* to temporary files and then applied at once when the final commit arrives.

wording: "we write the data" -> "the data is written"

~~~

30. src/backend/replication/logical/worker.c - ParallelState

+typedef struct ParallelState

Add to typedefs.list

~~~

31. src/backend/replication/logical/worker.c - ParallelState flags

+typedef struct ParallelState
+{
+ slock_t mutex;
+ bool attached;
+ bool ready;
+ bool finished;
+ bool failed;
+ Oid subid;
+ TransactionId stream_xid;
+ uint32 n;
+} ParallelState;

Those bool states look independent to me. Should they be one enum
member instead of lots of bool members?

~~~

32. src/backend/replication/logical/worker.c - ParallelState comments

+typedef struct ParallelState
+{
+ slock_t mutex;
+ bool attached;
+ bool ready;
+ bool finished;
+ bool failed;
+ Oid subid;
+ TransactionId stream_xid;
+ uint32 n;
+} ParallelState;

Needs some comments. Some might be self-evident but some are not -
e.g. what is 'n'?

~~~

33. src/backend/replication/logical/worker.c - WorkerState

+typedef struct WorkerState

Add to typedefs.list

~~~

34. src/backend/replication/logical/worker.c - WorkerEntry

+typedef struct WorkerEntry

Add to typedefs.list

~~~

35. src/backend/replication/logical/worker.c - static function names

+/* Worker setup and interactions */
+static void setup_dsm(WorkerState *wstate);
+static WorkerState *setup_background_worker(void);
+static void wait_for_worker_ready(WorkerState *wstate, bool notify);
+static void wait_for_transaction_finish(WorkerState *wstate);
+static void send_data_to_worker(WorkerState *wstate, Size nbytes,
+ const void *data);
+static WorkerState *find_or_start_worker(TransactionId xid, bool start);
+static void free_stream_apply_worker(void);
+static bool transaction_applied_in_bgworker(TransactionId xid);
+static void check_workers_status(void);

All these new functions have random-looking names. Since they all are
new to this feature I thought they should all be named similarly...

e.g. something like
bgworker_setup
bgworker_check_status
bgworker_wait_for_ready
etc.

~~~

36. src/backend/replication/logical/worker.c - nchanges

+
+static uint32 nchanges = 0;
+

What is this? Needs a comment.

~~~

37. src/backend/replication/logical/worker.c - handle_streamed_transaction

static bool
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
{
- TransactionId xid;
+ TransactionId current_xid = InvalidTransactionId;

/* not in streaming mode */
- if (!in_streamed_transaction)
+ if (!in_streamed_transaction && !isLogicalApplyWorker)
return false;
Is it correct to be testing the isLogicalApplyWorker here?

e.g. What if the streaming code is not using bgworkers at all?

At least maybe that comment (/* not in streaming mode */) should be updated?

~~~

38. src/backend/replication/logical/worker.c - handle_streamed_transaction

+ if (current_xid != stream_xid &&
+ !list_member_int(subxactlist, (int) current_xid))
+ {
+ MemoryContext oldctx;
+ char *spname = (char *) palloc(64 * sizeof(char));
+ sprintf(spname, "savepoint_for_xid_%u", current_xid);

Can't the name just be a char[64] on the stack?

~~~

39. src/backend/replication/logical/worker.c - handle_streamed_transaction

+ /*
+ * XXX The publisher side don't always send relation update message
+ * after the streaming transaction, so update the relation in main
+ * worker here.
+ */

typo: "don't" -> "doesn't" ?

~~~

40. src/backend/replication/logical/worker.c - apply_handle_commit_prepared

@@ -976,30 +1116,51 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];

logicalrep_read_commit_prepared(s, &prepare_data);
+
set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);

Spurious whitespace?

~~~

41. src/backend/replication/logical/worker.c - apply_handle_commit_prepared

+ /* Check if we have prepared transaction in another bgworker */
+ if (transaction_applied_in_bgworker(prepare_data.xid))
+ {
+ elog(DEBUG1, "received commit for streamed transaction %u", prepare_data.xid);

- /* There is no transaction when COMMIT PREPARED is called */
- begin_replication_step();
+ /* Send commit message */
+ send_data_to_worker(stream_apply_worker, s->len, s->data);

It seems a bit complex/tricky that the code is always relying on all
the side-effects that the global stream_apply_worker will be set.

I am not sure if it is possible to remove the global and untangle
everything. E.g. Why not change the transaction_applied_in_bgworker to
return the bgworker (instead of return bool) and then can assign it to
a local var in this function.

Or can’t you do HTAB lookup in a few more places instead of carrying
around the knowledge of some global var that was initialized in some
other place?

It would be easier if you can eliminate having to be aware of
side-effects happening behind the scenes.

~~~

42. src/backend/replication/logical/worker.c - apply_handle_rollback_prepared

@@ -1019,35 +1180,51 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];

logicalrep_read_rollback_prepared(s, &rollback_data);
+
set_apply_error_context_xact(rollback_data.xid,
rollback_data.rollback_end_lsn);

Spurious whitespace?

~~~

43. src/backend/replication/logical/worker.c - apply_handle_rollback_prepared

+ /* Check if we are processing the prepared transaction in a bgworker */
+ if (transaction_applied_in_bgworker(rollback_data.xid))
+ {
+ send_data_to_worker(stream_apply_worker, s->len, s->data);

Same as previous comment #41. Relies on the side effect of something
setting the global stream_apply_worker.

~~~

44. src/backend/replication/logical/worker.c - find_or_start_worker

+ /*
+ * For streaming transactions that is being applied in bgworker, we cannot
+ * decide whether to apply the change for a relation that is not in the
+ * READY state (see should_apply_changes_for_rel) as we won't know
+ * remote_final_lsn by that time. So, we don't start new bgworker in this
+ * case.
+ */

typo: "that is" -> "that are"

~~~

45. src/backend/replication/logical/worker.c - find_or_start_worker

+ if (MySubscription->stream != SUBSTREAM_APPLY)
+ return NULL;
...
+ else if (start && !XLogRecPtrIsInvalid(MySubscription->skiplsn))
+ return NULL;
...
+ else if (start && !AllTablesyncsReady())
+ return NULL;
+ else if (!start && ApplyWorkersHash == NULL)
+ return NULL;

I am not sure but I think most of that rejection if/else can probably
just be "if" (not "else if") because otherwise, the code would have
returned anyhow, right? Removing all the "else" might make the code
more readable.

~~~

46. src/backend/replication/logical/worker.c - find_or_start_worker

+ if (wstate == NULL)
+ {
+ /*
+ * If there is no more worker can be launched here, remove the
+ * entry in hash table.
+ */
+ hash_search(ApplyWorkersHash, &xid, HASH_REMOVE, &found);
+ return NULL;
+ }

wording: "If there is no more worker can be launched here, remove" ->
"If the bgworker cannot be launched, remove..."

~~~

47. src/backend/replication/logical/worker.c - free_stream_apply_worker

+/*
+ * Add the worker to the freelist and remove the entry from hash table.
+ */
+static void
+free_stream_apply_worker(void)

IMO it might be better to pass the bgworker here instead of silently
working with the global stream_apply_worker.

~~~

48. src/backend/replication/logical/worker.c - free_stream_apply_worker

+ elog(LOG, "adding finished apply worker #%u for xid %u to the idle list",
+ stream_apply_worker->pstate->n, stream_apply_worker->pstate->stream_xid);

Should the be an Assert here to check the bgworker state really was FINISHED?

~~~

49. src/backend/replication/logical/worker.c - serialize_stream_prepare

+static void
+serialize_stream_prepare(LogicalRepPreparedTxnData *prepare_data)

Missing function comment.

~~~

50. src/backend/replication/logical/worker.c - serialize_stream_start

-/*
- * Handle STREAM START message.
- */
static void
-apply_handle_stream_start(StringInfo s)
+serialize_stream_start(bool first_segment)

Missing function comment.

~~~

51. src/backend/replication/logical/worker.c - serialize_stream_stop

+static void
+serialize_stream_stop()
+{

Missing function comment.

~~~

52. src/backend/replication/logical/worker.c - general serialize_XXXX

I can see now that you have created many serialize_XXX functions which
seem to only be called one time. It looks like the only purpose is to
encapsulate the code to make the handler function shorter? But it
seems a bit uneven that you did this only for the serialize cases. If
you really want these separate functions then perhaps there ought to
also be the equivalent bgworker functions too. There seem to be always
3 scenarios:

i.e
1. Worker is the bgworker
2. Worker is Main Apply but a bgworker exists
3. Worker is Main apply and bgworker does not exist.

Perhaps every handler function should have THREE other little
functions that it calls appropriately?

~~~

53. src/backend/replication/logical/worker.c - serialize_stream_abort

+
+static void
+serialize_stream_abort(TransactionId xid, TransactionId subxid)
+{

Missing function comment.

~~~

54. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ if (isLogicalApplyWorker)
+ {
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("[Apply BGW #%u] aborting current transaction xid=%u, subxid=%u",
+ MyParallelState->n, GetCurrentTransactionIdIfAny(),
GetCurrentSubTransactionId())));

Why is the errcode using errcode_for_file_access? (2x)

~~~

55. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ /*
+ * OK, so it's a subxact. Rollback to the savepoint.
+ *
+ * We also need to read the subxactlist, determine the offset
+ * tracked for the subxact, and truncate the list.
+ */
+ int i;
+ bool found = false;
+ char *spname = (char *) palloc(64 * sizeof(char));

Can that just be char[64] on the stack?

~~~

56. src/backend/replication/logical/worker.c - apply_dispatch

@@ -2511,6 +3061,7 @@ apply_dispatch(StringInfo s)
break;

case LOGICAL_REP_MSG_STREAM_START:
+ elog(LOG, "LOGICAL_REP_MSG_STREAM_START");
apply_handle_stream_start(s);
break;

I guess this is just for debugging purposes so you should put some
FIXME comment here as a reminder to get rid of it later?

~~~

57. src/backend/replication/logical/worker.c - store_flush_position,
isLogicalApplyWorker

@@ -2618,6 +3169,10 @@ store_flush_position(XLogRecPtr remote_lsn)
{
FlushPosition *flushpos;

+ /* We only need to collect the LSN in main apply worker */
+ if (isLogicalApplyWorker)
+ return;
+

This comment is not specific to this function, but for global
isLogicalApplyWorker IMO this should be implemented to look more like
the inline function am_tablesync_worker().

e.g. I think you should replace this global with something like
am_apply_bgworker()

Maybe it should do something like check the value of
MyLogicalRepWorker->subworker?

~~~

58. src/backend/replication/logical/worker.c - LogicalRepApplyLoop

@@ -3467,6 +4025,7 @@ TwoPhaseTransactionGid(Oid subid, TransactionId
xid, char *gid, int szgid)
snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
}

+
/*
* Execute the initial sync with error handling. Disable the subscription,
* if it's required.

Spurious whitespace

~~~

59. src/backend/replication/logical/worker.c - ApplyWorkerMain

@@ -3733,7 +4292,7 @@ ApplyWorkerMain(Datum main_arg)

options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
- options.proto.logical.streaming = MySubscription->stream;
+ options.proto.logical.streaming = (MySubscription->stream != SUBSTREAM_OFF);
options.proto.logical.twophase = false;

I was not sure why this is converting from an enum to a boolean? Is it right?

~~~

60. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop

+ shmq_res = shm_mq_receive(mqh, &len, &data, false);
+
+ if (shmq_res != SHM_MQ_SUCCESS)
+ break;

Should this log some more error information here?

~~~

61. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop

+ if (len == 0)
+ {
+ elog(LOG, "[Apply BGW #%u] got zero-length message, stopping", pst->n);
+ break;
+ }
+ else
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;

Maybe the "else" is not needed here, and if you remove it then it will
get rid of all the unnecessary indentation.

~~~

62. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop

+ /*
+ * We use first byte of message for additional communication between
+ * main Logical replication worker and Apply BGWorkers, so if it
+ * differs from 'w', then process it first.
+ */

I was thinking maybe this switch should include

case 'w':
break;
because then for the "default" case you should give ERROR because
something unexpected arrived.

~~~

63. src/backend/replication/logical/worker.c - ApplyBgwShutdown

+static void
+ApplyBgwShutdown(int code, Datum arg)
+{
+ SpinLockAcquire(&MyParallelState->mutex);
+ MyParallelState->failed = true;
+ SpinLockRelease(&MyParallelState->mutex);
+
+ dsm_detach((dsm_segment *) DatumGetPointer(arg));
+}

Should this do detach first and set the flag last?

~~~

64. src/backend/replication/logical/worker.c - LogicalApplyBgwMain

+ /*
+ * Acquire a worker number.
+ *
+ * By convention, the process registering this background worker should
+ * have stored the control structure at key 0. We look up that key to
+ * find it. Our worker number gives our identity: there may be just one
+ * worker involved in this parallel operation, or there may be many.
+ */

Maybe there should be another elog closer to this comment? So as soon
as you know the BGW number log something?

e.g.
elog(LOG, "[Apply BGW #%u] starting", pst->n);

~~~

65. src/backend/replication/logical/worker.c - setup_background_worker

+/*
+ * Register background workers.
+ */
+static WorkerState *
+setup_background_worker(void)

I think that comment needs some more info because it is doing more
than just registering... it is successfully launching the worker
first.

~~~

66. src/backend/replication/logical/worker.c - setup_background_worker

+ if (launched)
+ {
+ /* Wait for worker to become ready. */
+ wait_for_worker_ready(wstate, false);
+
+ ApplyWorkersList = lappend(ApplyWorkersList, wstate);
+ nworkers += 1;
+ }

Do you really need to carry around this global 'nworkers' variable?
Can’t you just check the length of the ApplyWorkerList to get this
number?

~~~

67. src/backend/replication/logical/worker.c - send_data_to_worker

+/*
+ * Send the data to worker via shared-memory queue.
+ */
+static void
+send_data_to_worker(WorkerState *wstate, Size nbytes, const void *data)

wording: "to worker" -> "to the specified apply bgworker"

This is just another example of my comment #1.

~~~

68. src/backend/replication/logical/worker.c - send_data_to_worker

+ if (result != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not send tuple to shared-memory queue")));
+}

typo: is "tuples" the right word here?

~~~

69. src/backend/replication/logical/worker.c - wait_for_worker_ready

+
+static void
+wait_for_worker_ready(WorkerState *wstate, bool notify)
+{

Missing function comment.

~~~

70. src/backend/replication/logical/worker.c - wait_for_worker_ready

+
+static void
+wait_for_worker_ready(WorkerState *wstate, bool notify)
+{

'notify' seems a bit of a poor name here. And this param seems a bit
of a strange side-effect for something called wait_for_worker_ready.
If really need to do this way maybe name it something more verbose
like 'notify_received_stream_stop'?

~~~

71. src/backend/replication/logical/worker.c - wait_for_worker_ready

+ if (!result)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("one or more background workers failed to start")));

Is the ERROR code reachable? IIUC there is no escape from the previous
for (;;) loop except when the result is set to true.

~~~

72. src/backend/replication/logical/worker.c - wait_for_transaction_finish

+
+static void
+wait_for_transaction_finish(WorkerState *wstate)
+{

Missing function comment.

~~~

73. src/backend/replication/logical/worker.c - wait_for_transaction_finish

+ if (finished)
+ {
+ break;
+ }

The brackets are not needed for 1 statement.

~~~

74. src/backend/replication/logical/worker.c - transaction_applied_in_bgworker

+static bool
+transaction_applied_in_bgworker(TransactionId xid)

Instead of side-effect assigning the global variable, why not return
the bgworker (or NULL) and let the caller work with the result?

~~~

75. src/backend/replication/logical/worker.c - check_workers_status

+/*
+ * Check the status of workers and report an error if any bgworker exit
+ * unexpectedly.

wording: -> "... if any bgworker has exited unexpectedly ..."

~~~

76. src/backend/replication/logical/worker.c - check_workers_status

+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Background worker %u exited unexpectedly",
+ wstate->pstate->n)));

Should that message also give more identifying info about the
*current* worker doing the ERROR - e.g.the one which found this the
other bgworker was failed? Or is that just the PIC in the log message
good enough?

~~~

77. src/backend/replication/logical/worker.c - check_workers_status

+ if (!AllTablesyncsReady() && nfreeworkers != list_length(ApplyWorkersList))
+ {

I did not really understand this code, but isn't there a possibility
that it will cause many restarts if the tablesyncs are taking a long
time to complete?

======

78. src/include/catalog/pg_subscription.

@@ -122,6 +122,18 @@ typedef struct Subscription
List *publications; /* List of publication names to subscribe to */
} Subscription;

+/* Disallow streaming in-progress transactions */
+#define SUBSTREAM_OFF 'f'
+
+/*
+ * Streaming transactions are written to a temporary file and applied only
+ * after the transaction is committed on upstream.
+ */
+#define SUBSTREAM_SPOOL 's'
+
+/* Streaming transactions are appied immediately via a background worker */
+#define SUBSTREAM_APPLY 'a'

IIRC Vignesh had a similar options requirement for his "infinite
recursion" patch [1], except he was using enums instead of #define for
char. Maybe discuss with Vignesh (and either he should change or you
should change) so there is a consistent code style for the options.

======

79. src/include/replication/logicalproto.h - old extern

@@ -243,8 +243,10 @@ extern TransactionId
logicalrep_read_stream_start(StringInfo in,
extern void logicalrep_write_stream_stop(StringInfo out);
extern void logicalrep_write_stream_commit(StringInfo out,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
-extern TransactionId logicalrep_read_stream_commit(StringInfo out,
+extern TransactionId logicalrep_read_stream_commit_old(StringInfo out,
LogicalRepCommitData *commit_data);

Is anybody still using this "old" function? Maybe I missed it.

======

80. src/include/replication/logicalworker.h

@@ -13,6 +13,7 @@
#define LOGICALWORKER_H

extern void ApplyWorkerMain(Datum main_arg);
+extern void LogicalApplyBgwMain(Datum main_arg);

The new name seems inconsistent with the old one. What about calling
it ApplyBgworkerMain?

======

81. src/test/regress/expected/subscription.out

Isn't this missing some test cases for the new options added? E.g. I
never see streaming value is set to 's'.

======

82. src/test/subscription/t/029_on_error.pl

If options values were changed how I suggested (review comment #14)
then I think a change such as this would not be necessary because
everything would be backward compatible.

------
[1] https://www.postgresql.org/message-id/CALDaNm2Fe%3Dg4Tx-DhzwD6NU0VRAfaPedXwWO01maNU7_OfS8fw%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message David Rowley 2022-04-22 04:13:08 Re: A qsort template
Previous Message bucoo 2022-04-22 03:35:43 Re: fix cost subqueryscan wrong parallel cost