Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-15 11:39:45
Message-ID: CAA4eK1LMVdS6uM7Tw7ANL0BetAd76TKkmAXNNQa0haTe2tax6g@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Sep 15, 2022 at 10:45 AM wangw(dot)fnst(at)fujitsu(dot)com
<wangw(dot)fnst(at)fujitsu(dot)com> wrote:
>
> Attach the new patch set.
>

Review of v29-0001*
==================
1.
+parallel_apply_find_worker(TransactionId xid)
{
...
+ entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_FIND, &found);
+ if (found)
+ {
+ /* If any workers (or the postmaster) have died, we have failed. */
+ if (entry->winfo->error_mq_handle == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to parallel apply worker")));
...
}

I think the above comment is incorrect because if the postmaster would
have died then you wouldn't have found the entry in the hash table.
How about something like: "We can't proceed if the parallel streaming
worker has already exited."

2.
+/*
+ * Find the previously assigned worker for the given transaction, if any.
+ */
+ParallelApplyWorkerInfo *
+parallel_apply_find_worker(TransactionId xid)

No need to use word 'previously' in the above sentence.

3.
+ * We need one key to register the location of the header, and we need
+ * another key to track the location of the message queue.
+ */
+ shm_toc_initialize_estimator(&e);
+ shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
+ shm_toc_estimate_chunk(&e, queue_size);
+ shm_toc_estimate_chunk(&e, error_queue_size);
+
+ shm_toc_estimate_keys(&e, 3);

Overall, three keys are used but the comment indicates two. You forgot
to mention about error_queue.

4.
+ if (launched)
+ ParallelApplyWorkersList = lappend(ParallelApplyWorkersList, winfo);
+ else
+ {
+ shm_mq_detach(winfo->mq_handle);
+ shm_mq_detach(winfo->error_mq_handle);
+ dsm_detach(winfo->dsm_seg);
+ pfree(winfo);
+
+ winfo = NULL;
+ }

A. The code used in the else part to free worker info is the same as
what is used in parallel_apply_free_worker. Can we move this to a
separate function say parallel_apply_free_worker_info()?
B. I think it will be better if you use {} for if branch to make it
look consistent with else branch.

5.
+ * case define a named savepoint, so that we are able to commit/rollback it
+ * separately later.
+ */
+void
+parallel_apply_subxact_info_add(TransactionId current_xid)

I don't see the need of commit in the above message. So, we can
slightly modify it to: "... so that we are able to rollback to it
separately later."

6.
+ for (i = list_length(subxactlist) - 1; i >= 0; i--)
+ {
+ xid = list_nth_xid(subxactlist, i);
...
...

+/*
+ * Return the TransactionId value contained in the n'th element of the
+ * specified list.
+ */
+static inline TransactionId
+list_nth_xid(const List *list, int n)
+{
+ Assert(IsA(list, XidList));
+ return lfirst_xid(list_nth_cell(list, n));
+}

I am not really sure that we need a new list function to use for this
place. Can't we directly use lfirst_xid(list_nth_cell) instead?

7.
+void
+parallel_apply_replorigin_setup(void)
+{
+ RepOriginId originid;
+ char originname[NAMEDATALEN];
+ bool started_tx = false;
+
+ /* This function might be called inside or outside of transaction. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }

Is there a place in the patch where this function will be called
without having an active transaction state? If so, then this coding is
fine but if not, then I suggest keeping an assert for transaction
state here. The same thing applies to
parallel_apply_replorigin_reset() as well.

8.
+ *
+ * If write_abort_lsn is true, send the abort_lsn and abort_time fields,
+ * otherwise don't.
*/
void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
- TransactionId subxid)
+ TransactionId subxid, XLogRecPtr abort_lsn,
+ TimestampTz abort_time, bool abort_info)

In the comment, the name of the variable needs to be updated.

9.
+TransactionId stream_xid = InvalidTransactionId;

-static TransactionId stream_xid = InvalidTransactionId;
...
...
+void
+parallel_apply_subxact_info_add(TransactionId current_xid)
+{
+ if (current_xid != stream_xid &&
+ !list_member_xid(subxactlist, current_xid))

It seems you have changed the scope of stream_xid to use it in
parallel_apply_subxact_info_add(). Won't it be better to pass it as a
parameter (say top_xid)?

10.
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -20,6 +20,7 @@
#include <sys/time.h>

#include "access/xlog.h"
+#include "catalog/pg_subscription.h"
#include "catalog/pg_type.h"
#include "common/connect.h"
#include "funcapi.h"
@@ -443,9 +444,14 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
appendStringInfo(&cmd, "proto_version '%u'",
options->proto.logical.proto_version);

- if (options->proto.logical.streaming &&
- PQserverVersion(conn->streamConn) >= 140000)
- appendStringInfoString(&cmd, ", streaming 'on'");
+ if (options->proto.logical.streaming != SUBSTREAM_OFF)
+ {
+ if (PQserverVersion(conn->streamConn) >= 160000 &&
+ options->proto.logical.streaming == SUBSTREAM_PARALLEL)
+ appendStringInfoString(&cmd, ", streaming 'parallel'");
+ else if (PQserverVersion(conn->streamConn) >= 140000)
+ appendStringInfoString(&cmd, ", streaming 'on'");
+ }

It doesn't seem like a good idea to expose subscription options here.
Can we think of having char *streaming_option instead of the current
streaming parameter which is filled by the caller and used here
directly?

11. The error message used in pgoutput_startup() seems to be better
than the current messages used in that function but it is better to be
consistent with other messages. There is a discussion in the email
thread [1] on improving those messages, so kindly suggest there.

12. In addition to the above, I have changed/added a few comments in
the attached patch.

[1] - https://www.postgresql.org/message-id/20220914.111507.13049297635620898.horikyota.ntt%40gmail.com

--
With Regards,
Amit Kapila.

Attachment Content-Type Size
changed_comments_amit_v29.patch application/octet-stream 2.6 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Kapila 2022-09-15 12:09:17 Re: Improve description of XLOG_RUNNING_XACTS
Previous Message Japin Li 2022-09-15 11:36:40 Re: SUBTRANS: Minimizing calls to SubTransSetParent()