RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-10-07 06:18:09
Message-ID: OS0PR01MB5716B400CD81565E868616DB945F9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Friday, September 30, 2022 4:27 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> Here are my review comments for the v35-0001 patch:

Thanks for the comments.

> 3. GENERAL
> I found the mixed use of the same member names having different meanings to be quite confusing.
>
> e.g.1
> PGOutputData 'streaming' is now a single char internal representation the subscription parameter streaming mode ('f','t','p')
> - bool streaming;
> + char streaming;
>
> e.g.2
> WalRcvStreamOptions 'streaming' is a C string version of the subscription streaming mode ("on", "parallel")
> - bool streaming; /* Streaming of large transactions */
> + char *streaming; /* Streaming of large transactions */
>
> e.g.3
> SubOpts 'streaming' is again like the first example - a single char for the mode.
> - bool streaming;
> + char streaming;
>
>
> IMO everything would become much simpler if you did:
>
> 3a.
> Rename "char streaming;" -> "char streaming_mode;"

The word 'streaming' is the same as the actual option name, so personally I think it's fine.
But if others also agreed that the name can be improved, I can change it.

>
> 3b. Re-designed the "char *streaming;" code to also use the single char
> notation, then also call that member 'streaming_mode'. Then everything will
> be > consistent.

If we use single byte(char) here we would need to compare it with the standard
streaming option value in libpqwalreceiver.c which was suggested not to do[1].

> 4. - max_parallel_apply_workers_per_subscription
> + </para>
> + <para>
> + The parallel apply workers are taken from the pool defined by
> + <varname>max_logical_replication_workers</varname>.
> + </para>
> + <para>
> + The default value is 2. This parameter can only be set in the
> + <filename>postgresql.conf</filename> file or on the server command
> + line.
> + </para>
> + </listitem>
> + </varlistentry>
>
> I felt that maybe this should also xref to the
> doc/src/sgml/logical-replication.sgml section where you say about
> "max_logical_replication_workers should be increased according to the
> desired number of parallel apply workers."

Not sure about this as we don't have similar thing in the document of
max_logical_replication_workers and max_sync_workers_per_subscription.

> ======
>
> 7. src/backend/access/transam/xact.c - RecordTransactionAbort
>
>
> + /*
> + * Are we using the replication origins feature? Or, in other words,
> + are
> + * we replaying remote actions?
> + */
> + replorigin = (replorigin_session_origin != InvalidRepOriginId &&
> + replorigin_session_origin != DoNotReplicateId);
>
> "Or, in other words," -> "In other words,"

I think it is better to keep consistent with the comments in function
RecordTransactionCommit.

> 10b.
> IMO this flag might be better to be called 'parallel_apply_enabled' or something similar.
> (see also review comment #55b.)

Not sure about this.

> 12. - parallel_apply_free_worker
>
> + SpinLockAcquire(&winfo->shared->mutex);
> + slot_no = winfo->shared->logicalrep_worker_slot_no;
> + generation = winfo->shared->logicalrep_worker_generation;
> + SpinLockRelease(&winfo->shared->mutex);
>
> I know there are not many places doing this, but do you think it might be
> worth introducing some new set/get function to encapsulate the set/get of the
> >generation/slot so it does the mutex spin-locks in common code?

Not sure about this.

> 13. - LogicalParallelApplyLoop
>
> + /*
> + * Init the ApplyMessageContext which we clean up after each
> + replication
> + * protocol message.
> + */
> + ApplyMessageContext = AllocSetContextCreate(ApplyContext,
> + "ApplyMessageContext",
> + ALLOCSET_DEFAULT_SIZES);
>
> Because this is in the parallel apply worker should the name (e.g. the 2nd
> param) be changed to "ParallelApplyMessageContext"?

Not sure about this, because ApplyMessageContext is used in both worker.c and
applyparallelworker.c.

> + else if (is_subworker)
> + snprintf(bgw.bgw_name, BGW_MAXLEN,
> + "logical replication parallel apply worker for subscription %u",
> + subid);
> else
> snprintf(bgw.bgw_name, BGW_MAXLEN,
> "logical replication worker for subscription %u", subid);
>
> I think that *last* text now be changed like below:
>
> BEFORE
> "logical replication worker for subscription %u"
> AFTER
> "logical replication apply worker for subscription %u"

I am not sure if it's a good idea to change existing process description.

> 36 - should_apply_changes_for_rel
> should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) {
> if (am_tablesync_worker())
> return MyLogicalRepWorker->relid == rel->localreloid;
> + else if (am_parallel_apply_worker())
> + {
> + if (rel->state != SUBREL_STATE_READY)
> + ereport(ERROR,
> + (errmsg("logical replication apply workers for subscription \"%s\"
> will restart",
> + MySubscription->name),
> + errdetail("Cannot handle streamed replication transaction using parallel "
> + "apply workers until all tables are synchronized.")));
> +
> + return true;
> + }
> else
> return (rel->state == SUBREL_STATE_READY ||
> (rel->state == SUBREL_STATE_SYNCDONE && @@ -427,43 +519,87 @@ end_replication_step(void)
>
> This function can be made tidier just by removing all the 'else' ...

I feel the current style looks better.

> 40. - apply_handle_stream_prepare
>
> + case TRANS_LEADER_SERIALIZE:
>
> - /* Mark the transaction as prepared. */
> - apply_handle_prepare_internal(&prepare_data);
> + /*
> + * The transaction has been serialized to file, so replay all the
> + * spooled operations.
> + */
>
> Spurious blank line after the 'case'.

Personally, I think this style is fine.

> 48. - ApplyWorkerMain
>
> +/* Logical Replication Apply worker entry point */ void
> +ApplyWorkerMain(Datum main_arg)
>
> "Apply worker" -> "apply worker"

Since it's the existing comment, I feel we can leave this.

> + /*
> + * We don't currently need any ResourceOwner in a walreceiver process,
> + but
> + * if we did, we could call CreateAuxProcessResourceOwner here.
> + */
>
> I think this comment should have "XXX" prefix.

I am not sure as this comment is just a reminder.

> 50.
>
> + if (server_version >= 160000 &&
> + MySubscription->stream == SUBSTREAM_PARALLEL)
> + {
> + options.proto.logical.streaming = pstrdup("parallel");
> + MyLogicalRepWorker->parallel_apply = true;
> + }
> + else if (server_version >= 140000 &&
> + MySubscription->stream != SUBSTREAM_OFF)
> + options.proto.logical.streaming = pstrdup("on"); else
> + options.proto.logical.streaming = NULL;
>
> IMO it might make more sense for these conditions to be checking the
> 'options.proto.logical.proto_version' here instead of checking the hardwired
> server > versions. Also, I suggest may be better (for clarity) to always
> assign the parallel_apply member.

Currently, the proto_version is only checked at publisher, I am not sure if
it's a good idea to check it here.

> 52. - get_transaction_apply_action
>
> + /*
> + * Check if we are processing this transaction using a parallel apply
> + * worker and if so, send the changes to that worker.
> + */
> + else if ((*winfo = parallel_apply_find_worker(xid))) { return
> +TRANS_LEADER_SEND_TO_PARALLEL; } else { return
> +TRANS_LEADER_SERIALIZE; } }
>
> 52a.
> All these if/else and code blocks seem excessive. It can be simplified as follows:

I feel this style is fine.

> 52b.
> Can a tablesync worker ever get here? It might be better to
> Assert(!am_tablesync_worker()); at top of this function?

Not sure if it's necessary or not.

> 55b.
> IMO this member name should be named slightly different to give a better feel
> for what it really means.
>
> Maybe something like one of:
> "parallel_apply_ok"
> "parallel_apply_enabled"
> "use_parallel_apply"
> etc?

I feel the current name is fine. But if others also feel the same, I can try to
rename it.

> 57. - am_leader_apply_worker
>
> +static inline bool
> +am_leader_apply_worker(void)
> +{
> + return (!OidIsValid(MyLogicalRepWorker->relid) &&
> +!isParallelApplyWorker(MyLogicalRepWorker));
> +}
>
> I wondered if it would be tidier/easier to define this function like below.
> The others are inline functions anyhow so it should end up as the same >
> thing, right?
>
> static inline bool
> am_leader_apply_worker(void)
> {
> return (!am_tablesync_worker() && !am_parallel_apply_worker); }

I feel the current style is fine.

>--- fail - streaming must be boolean
>+-- fail - streaming must be boolean or 'parallel'
> CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = >false, streaming = foo);
>
>I think there are tests already for explicitly create/set the subscription
>parameter streaming = on/off/parallel
>
>But what about when there is no value explicitly specified? Shouldn't there
>also be tests like below to check that *implied* boolean true still works for
>this enum?

I didn't find similar tests for no value explicitly specified cases, so I didn't add this
for now.

Attach the new version patch set which addressed most of the comments.

[1] https://www.postgresql.org/message-id/CAA4eK1LMVdS6uM7Tw7ANL0BetAd76TKkmAXNNQa0haTe2tax6g%40mail.gmail.com

Best regards,
Hou zj

Attachment Content-Type Size
v36-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 74.5 KB
v36-0003-Add-some-checks-before-using-parallel-apply-work.patch application/octet-stream 50.4 KB
v36-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 61.4 KB
v36-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 8.7 KB
v36-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 137.4 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Etsuro Fujita 2022-10-07 06:18:59 Re: Fast COPY FROM based on batch insert
Previous Message houzj.fnst@fujitsu.com 2022-10-07 06:15:06 RE: Perform streaming logical transactions by background workers and parallel apply