RE: Perform streaming logical transactions by background workers and parallel apply

From: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>
Cc: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-05-30 08:51:59
Message-ID: OS3PR01MB6275797F66EF0A47EEB2D8FC9EDD9@OS3PR01MB6275.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, May 18, 2022 3:11 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> "Here are my review comments for v6-0001.
Thanks for your comments.

> 7. src/backend/replication/logical/launcher.c - logicalrep_worker_stop_internal
>
> +
> + Assert(LWLockHeldByMe(LogicalRepWorkerLock));
> +
>
> I think there should be a comment here to say that this lock is
> required/expected to be released by the caller of this function.
IMHO, it maybe not a problem to read code here.
In addition, keep consistent with other places where invoke this function in
the same file. So I did not change this.

> 9. src/backend/replication/logical/worker.c - General
>
> Some of the logs have a prefix "[Apply BGW #%u]" and some do not; I
> did not really understand how you decided to prefix or not so I did
> not comment about them individually. Are they all OK? Perhaps if you
> can explain the reason for the choices I can review it better next
> time.
I think most of these logs should be logged in debug mode. So I changed them to
"DEBUG1" level.
And I added the prefix to all messages logged by apply background worker and
deleted some logs that I think maybe not very helpful.

> 11. src/backend/replication/logical/worker.c - file header comment
>
> The whole comment is similar to the commit message so any changes made
> there (for #2, #3) should be made here also.
Improve the comments as suggested in #2.
Sorry but I did not find same message as #2 here.

> 13. src/backend/replication/logical/worker.c
>
> WorkerState
> WorkerEntry
>
> I felt that these struct names seem too generic - shouldn't they be
> something more like ApplyBgworkerState, ApplyBgworkerEntry
>
> ~~~
I think we have used "ApplyBgworkerState" in the patch. So I improved this with
the following modifications:
```
ApplyBgworkerState -> ApplyBgworkerStatus
WorkerState -> ApplyBgworkerState
WorkerEntry -> ApplyBgworkerEntry
```
BTW, I also modified the relevant comments and variable names.

> 16. src/backend/replication/logical/worker.c - handle_streamed_transaction
>
> + * For the main apply worker, if in streaming mode (receiving a block of
> + * streamed transaction), we send the data to the apply background worker.
> + *
> + * For the apply background worker, define a savepoint if new subtransaction
> + * was started.
> *
> * Returns true for streamed transactions, false otherwise (regular mode).
> */
> static bool
> handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
>
> 16a.
> Typo: "if new subtransaction" -> "if a new subtransaction"
>
> 16b.
> That "regular mode" comment seems not quite right because IIUC it also
> returns false also for a bgworker (which hardly seems like a "regular
> mode")
16a. Improved it as suggested.
16b. Changed the comment as follows:
From:
```
* Returns true for streamed transactions, false otherwise (regular mode).
```
To:
```
* For non-streamed transactions, returns false;
* For streamed transactions, returns true if in main apply worker, false
* otherwise.
```

> 19. src/backend/replication/logical/worker.c - find_or_start_apply_bgworker
>
> + if (found)
> + {
> + entry->wstate->pstate->state = APPLY_BGWORKER_BUSY;
> + return entry->wstate;
> + }
> + else if (!start)
> + return NULL;
> +
> + /* If there is at least one worker in the idle list, then take one. */
> + if (list_length(ApplyWorkersIdleList) > 0)
>
> I felt that there should be a comment (after the return NULL) that says:
>
> /*
> * Start a new apply background worker
> */
>
> ~~~
Improve this comment here.
After the code that you mentioned, it will try to get a apply background
worker (try to start one or take one from idle list). So I change the comment
as follows:
From:
```
/* If there is at least one worker in the idle list, then take one. */
```
To:
```
/*
* Now, we try to get a apply background worker.
* If there is at least one worker in the idle list, then take one.
* Otherwise, we try to start a new apply background worker.
*/
```

> 22. src/backend/replication/logical/worker.c - apply_handle_stream_start
>
> /*
> - * Initialize the worker's stream_fileset if we haven't yet. This will be
> - * used for the entire duration of the worker so create it in a permanent
> - * context. We create this on the very first streaming message from any
> - * transaction and then use it for this and other streaming transactions.
> - * Now, we could create a fileset at the start of the worker as well but
> - * then we won't be sure that it will ever be used.
> + * If we are in main apply worker, check if there is any free bgworker
> + * we can use to process this transaction.
> */
> - if (MyLogicalRepWorker->stream_fileset == NULL)
> + stream_apply_worker = apply_bgworker_find_or_start(stream_xid,
> first_segment);
>
> 22a.
> Typo: "in main apply worker" -> "in the main apply worker"
>
> 22b.
> Since this is not if/else code, it might be better to put
> Assert(!am_apply_bgworker()); above this just to make it more clear.
22a. Improved it as suggested.
22b.
IMHO, since we have `if (am_apply_bgworker())` above and it will return in this
if-condition, so I just think Assert() might be a bit redundant here.
So I did not change this.

> 26. src/backend/replication/logical/worker.c - apply_handle_stream_abort
>
> + if (found)
> + {
> + elog(LOG, "rolled back to savepoint %s", spname);
> + RollbackToSavepoint(spname);
> + CommitTransactionCommand();
> + subxactlist = list_truncate(subxactlist, i + 1);
> + }
>
> Does this need to log anything if nothing was found? Or is it ok to
> leave as-is and silently ignore it?
Yes, I think it is okay.

> 33. src/backend/replication/logical/worker.c - check_workers_status
>
> +/* Set the state of apply background worker */
> +static void
> +apply_bgworker_set_state(char state)
>
> Maybe OK, or perhaps choose from one of:
> - "Set the state of an apply background worker"
> - "Set the apply background worker state"
Improve it by using the second one.

> 34. src/bin/pg_dump/pg_dump.c - getSubscriptions
>
> @@ -4450,7 +4450,7 @@ getSubscriptions(Archive *fout)
> if (fout->remoteVersion >= 140000)
> appendPQExpBufferStr(query, " s.substream,\n");
> else
> - appendPQExpBufferStr(query, " false AS substream,\n");
> + appendPQExpBufferStr(query, " 'f' AS substream,\n");
>
>
> Is that logic right? Before this patch the attribute was bool; now it
> is char. So doesn't there need to be some conversion/mapping here for
> when you read from >= 140000 but it was still bool so you need to
> convert 'false' -> 'f' and 'true' -> 't'?
Yes, I think it is right.
We could handle the input of option "streaming" : on/true/off/false/apply.

The rest of the comments are improved as suggested.

And thanks for Shi Yu to improve the patch 0002 by addressing the comments in
[1].

Attach the new patches(only changed 0001 and 0002)

[1] - https://www.postgresql.org/message-id/CAHut%2BPv_0nfUxriwxBQnZTOF5dy5nfG5NtWMr8e00mPrt2Vjzw%40mail.gmail.com

Regards,
Wang wei

Attachment Content-Type Size
v7-0001-Perform-streaming-logical-transactions-by-backgro.patch application/octet-stream 74.7 KB
v7-0002-Test-streaming-apply-option-in-tap-test.patch application/octet-stream 64.7 KB
v7-0003-Add-some-checks-before-using-apply-background-wor.patch application/octet-stream 17.8 KB
v7-0004-Add-a-GUC-max_apply_bgworkers_per_subscription-to.patch application/octet-stream 6.9 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andrey Borodin 2022-05-30 09:40:06 Amcheck verification of GiST and GIN
Previous Message Yugo NAGATA 2022-05-30 08:44:18 Re: Prevent writes on large objects in read-only transactions