Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Dilip Kumar <dilipbalaut(at)gmail(dot)com>
Cc: Michael Paquier <michael(at)paquier(dot)xyz>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Date: 2019-12-11 11:52:28
Message-ID: CAA4eK1J_YwXrenOjEVF-uz0+2YVdQD2e8mOsyvSfiOXkhjzMtA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Dec 9, 2019 at 1:27 PM Dilip Kumar <dilipbalaut(at)gmail(dot)com> wrote:
>
> I have review the patch set and here are few comments/questions
>
> 1.
> +static void
> +pg_decode_stream_change(LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn,
> + Relation relation,
> + ReorderBufferChange *change)
> +{
> + OutputPluginPrepareWrite(ctx, true);
> + appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
> + OutputPluginWrite(ctx, true);
> +}
>
> Should we show the tuple in the streamed change like we do for the
> pg_decode_change?
>

I think so. The patch shows the message in
pg_decode_stream_message(), so why to prohibit showing tuple here?

> 2. pg_logical_slot_get_changes_guts
> It recreate the decoding slot [ctx =
> CreateDecodingContext(InvalidXLogRecPtr] but doesn't set the streaming
> to false, should we pass a parameter to
> pg_logical_slot_get_changes_guts saying whether we want streamed results or not
>

CreateDecodingContext internally calls StartupDecodingContext which
sets the value of streaming based on if the plugin has provided
callbacks for streaming functions. Isn't that sufficient? Why do we
need additional parameters here?

> 3.
> + XLogRecPtr prev_lsn = InvalidXLogRecPtr;
> ReorderBufferChange *change;
> ReorderBufferChange *specinsert = NULL;
>
> @@ -1565,6 +1965,16 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
> Relation relation = NULL;
> Oid reloid;
>
> + /*
> + * Enforce correct ordering of changes, merged from multiple
> + * subtransactions. The changes may have the same LSN due to
> + * MULTI_INSERT xlog records.
> + */
> + if (prev_lsn != InvalidXLogRecPtr)
> + Assert(prev_lsn <= change->lsn);
> +
> + prev_lsn = change->lsn;
> I did not understand, how this change is relavent to this patch
>

This is just to ensure that changes are in LSN order. I think as we
are merging the changes before commit for streaming, it is good to
have such an Assertion for ReorderBufferStreamTXN. And, if we want
to have it in ReorderBufferStreamTXN, then there is no harm in keeping
it in ReorderBufferCommit() at least to keep the code consistent. Do
you see any problem with this?

> 4.
> + /*
> + * TOCHECK: We have to rebuild historic snapshot to be sure it includes all
> + * information about subtransactions, which could arrive after streaming start.
> + */
> + if (!txn->is_schema_sent)
> + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
> + txn, command_id);
>
> In which case, txn->is_schema_sent will be true, because at the end of
> the stream in ReorderBufferExecuteInvalidations we are always setting
> it false,
> so while sending next stream it will always be false. That means we
> never required snapshot_now variable in ReorderBufferTXN.
>

You are probably right, but as discussed we need to change this part
of design/code (when to send schema changes) due to the issues
discovered. So, I think this part will anyway change when we fix that
problem.

> 5.
> @@ -2299,6 +2746,23 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer
> *rb, TransactionId xid,
> txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
>
> txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
> +
> + /*
> + * We read catalog changes from WAL, which are not yet sent, so
> + * invalidate current schema in order output plugin can resend
> + * schema again.
> + */
> + txn->is_schema_sent = false;
>
> Same as point 4, during decode time it will never be true.
>

Sure, my previous point's reply applies here as well.

> 6.
> + /* send fields */
> + pq_sendint64(out, commit_lsn);
> + pq_sendint64(out, txn->end_lsn);
> + pq_sendint64(out, txn->commit_time);
>
> Commit_time and end_lsn is used in standby_feedback
>

I don't understand what you mean by this. Can you be a bit more clear?

>
> 7.
> + /* FIXME optimize the search by bsearch on sorted data */
> + for (i = nsubxacts; i > 0; i--)
> + {
> + if (subxacts[i - 1].xid == subxid)
> + {
> + subidx = (i - 1);
> + found = true;
> + break;
> + }
> + }
> We can not rollback intermediate subtransaction without rollbacking
> latest sub-transaction, so why do we need
> to search in the array? It will always be the the last subxact no?
>

The same thing is already mentioned in the comments above this code
("XXX Or perhaps we can rely on the aborts to arrive in the reverse
order, i.e. from the inner-most subxact (when nested)? In which case
we could simply check the last element."). I think what you are
saying is probably right, but we can leave this as it is for now
because this is a minor optimization which can be done later as well
if required. However, if you see any correctness issue, then we can
discuss.

> 8.
> + /*
> + * send feedback to upstream
> + *
> + * XXX Probably should send a valid LSN. But which one?
> + */
> + send_feedback(InvalidXLogRecPtr, false, false);
>
> Why feedback is sent for every change?
>

I will study this part of the patch and let you know my opinion.

Few comments on this patch series:

0001-Immediately-WAL-log-assignments:
------------------------------------------------------------

The commit message still refers to the old design for this patch. I
think you need to modify the commit message as per the latest patch.

0002-Issue-individual-invalidations-with-wal_level-log
----------------------------------------------------------------------------
1.
xact_desc_invalidations(StringInfo buf,
{
..
+ else if (msg->id == SHAREDINVALSNAPSHOT_ID)
+ appendStringInfo(buf, " snapshot %u", msg->sn.relId);

You have removed logging for the above cache but forgot to remove its
reference from one of the places. Also, I think you need to add a
comment somewhere in inval.c to say why you are writing for WAL for
some types of invalidations and not for others?

0003-Extend-the-output-plugin-API-with-stream-methods
--------------------------------------------------------------------------------
1.
+ are required, while <function>stream_message_cb</function> and
+ <function>stream_message_cb</function> are optional.

stream_message_cb is mentioned twice. It seems the second one is for truncate.

2.
size of the transaction size and network bandwidth, the transfer time
+ may significantly increase the apply lag.

/size of the transaction size/size of the transaction

no need to mention size twice.

3.
+ Similarly to spill-to-disk behavior, streaming is triggered when the total
+ amount of changes decoded from the WAL (for all in-progress
transactions)
+ exceeds limit defined by <varname>logical_work_mem</varname> setting.

The guc name used is wrong. /Similarly to/Similar to/

4.
stream_start_cb_wrapper()
{
..
+ /* state.report_location = apply_lsn; */
..
+ /* FIXME ctx->write_location = apply_lsn; */
..
}

See, if we can fix these and similar in the callback for the stop. I
think we don't have final_lsn till we commit/abort. Can we compute
before calling these API's?

0005-Gracefully-handle-concurrent-aborts-of-uncommitte
----------------------------------------------------------------------------------
1.
@@ -1877,6 +1877,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
PG_CATCH();
{
/* TODO: Encapsulate cleanup
from the PG_TRY and PG_CATCH blocks */
+
if (iterstate)
ReorderBufferIterTXNFinish(rb, iterstate);

Spurious line change.

2. The commit message of this patch refers to Prepared transactions.
I think that needs to be changed.

0006-Implement-streaming-mode-in-ReorderBuffer
-------------------------------------------------------------------------
1.
+
+/* iterator for streaming (only get data from memory) */
+static ReorderBufferStreamIterTXNState * ReorderBufferStreamIterTXNInit(
+
ReorderBuffer *rb,
+
ReorderBufferTXN
*txn);
+
+static ReorderBufferChange *ReorderBufferStreamIterTXNNext(
+ ReorderBuffer *rb,
+
ReorderBufferStreamIterTXNState * state);
+
+static void ReorderBufferStreamIterTXNFinish(
+
ReorderBuffer *rb,
+
ReorderBufferStreamIterTXNState * state);

Do we really need to introduce new APIs for iterating over changes
from streamed transactions? Why can't we reuse the same API's as we
use for committed xacts?

2.
+static void
+ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)

Please write some comments atop ReorderBufferStreamCommit.

3.
+ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
..
..
+ if (txn->snapshot_now
== NULL)
+ {
+ dlist_iter subxact_i;
+
+ /* make sure this transaction is streamed for the first time */
+
Assert(!rbtxn_is_streamed(txn));
+
+ /* at the beginning we should have invalid command ID */
+ Assert(txn->command_id ==
InvalidCommandId);
+
+ dlist_foreach(subxact_i, &txn->subtxns)
+ {
+ ReorderBufferTXN *subtxn;
+
+
subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
+
+ if (subtxn->base_snapshot != NULL &&
+
(txn->base_snapshot == NULL ||
+ txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
+ {
+
txn->base_snapshot = subtxn->base_snapshot;

The logic here seems to be correct, but I am not sure why it is not
considered to purge the base snapshot before assigning the subtxn's
snapshot and similarly, we have not purged snapshot for subtxn once we
are done with it. I think we can use
ReorderBufferTransferSnapToParent to replace part of the logic here.
Do you see any reason for doing things differently here?

4. In ReorderBufferStreamTXN, why do you need to use
ReorderBufferCopySnap to assign txn->base_snapshot to snapshot_now.

5. I see a lot of code similarity in ReorderBufferStreamTXN and
existing ReorderBufferCommit. I understand that there are some subtle
differences due to which we need to write this new function but can't
we encapsulate the specific parts of code in functions and then call
from both places. I am talking about code in different cases for
change->action.

6. + * Note: We never stream and serialize a transaction at the same time (e
/(e/(we

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2019-12-11 12:15:07 Re: error context for vacuum to include block number
Previous Message Leif Gunnar Erlandsen 2019-12-11 11:40:26 Re: pause recovery if pitr target not reached