Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions

From: Dilip Kumar <dilipbalaut(at)gmail(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Kuntal Ghosh <kuntalghosh(dot)2007(at)gmail(dot)com>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Date: 2020-05-17 07:10:46
Message-ID: CAFiTN-sqo+UaEzP0y5sAvaLd60awR55fgt6HFN+RR1MojRLV1w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Fri, May 15, 2020 at 4:04 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Fri, May 15, 2020 at 2:47 PM Dilip Kumar <dilipbalaut(at)gmail(dot)com> wrote:
> >
> > > 6. I think it will be good if we can provide an example of streaming
> > > changes via test_decoding at
> > > https://www.postgresql.org/docs/devel/test-decoding.html. I think we
> > > can also explain there why the user is not expected to see the actual
> > > data in the stream.
> >
> > I have a few problems to solve here.
> > - With streaming transaction also shall we show the actual values or
> > we shall do like it is currently in the patch
> > (appendStringInfo(ctx->out, "streaming change for TXN %u",
> > txn->xid);). I think we should show the actual values instead of what
> > we are doing now.
> >
>
> I think why we don't want to display the tuple at this stage is
> because it is not clear by this time if the transaction will commit or
> abort. I am not sure if displaying the contents of aborted
> transactions is a good idea but if there is a reason for doing so, we
> can do it later as well.
>
> > - In the example we can not show a real example, because of the
> > in-progress transaction to show the changes, we might have to
> > implement a lot of tuple. I think we can show the partial output?
> >
>
> I think we can display what API will actually display, what is the
> confusion here.

Added example in the v22-0011 patch where I have added the API to get
streaming changes.

> I have a few more comments on the previous version of patch
> v20-0005-Implement-streaming-mode-in-ReorderBuffer. If you have fixed
> any, then leave those and fix others.
>
> Review comments:
> ------------------------------
> 1.
> @@ -1762,10 +1952,16 @@ ReorderBufferCommit(ReorderBuffer *rb,
> TransactionId xid,
> }
>
> case REORDER_BUFFER_CHANGE_MESSAGE:
> - rb->message(rb, txn, change->lsn, true,
> - change->data.msg.prefix,
> - change->data.msg.message_size,
> - change->data.msg.message);
> + if (streaming)
> + rb->stream_message(rb, txn, change->lsn, true,
> + change->data.msg.prefix,
> + change->data.msg.message_size,
> + change->data.msg.message);
> + else
> + rb->message(rb, txn, change->lsn, true,
> + change->data.msg.prefix,
> + change->data.msg.message_size,
> + change->data.msg.message);
>
> Don't we need to set any_data_sent flag while streaming messages as we
> do for other types of changes?

I think any_data_sent, was added to avoid sending abort to the
subscriber if we haven't sent any data, but this is not complete as
the output plugin can also take the decision not to send. So I think
this should not be done as part of this patch and can be done
separately. I think there is already a thread for handling the
same[1]

> 2.
> + if (streaming)
> + {
> + /*
> + * Set the last of the stream as the final lsn before calling
> + * stream stop.
> + */
> + if (!XLogRecPtrIsInvalid(prev_lsn))
> + txn->final_lsn = prev_lsn;
> + rb->stream_stop(rb, txn);
> + }
>
> I am not sure if it is good to use final_lsn for this purpose. See
> comments for this variable in reorderbuffer.h. Basically, it is used
> for a specific purpose on different occasions. Now, if we want to
> start using it for a new purpose, we need to study its interaction
> with all other places and update the comments as well. Can we pass an
> additional parameter to stream_stop() instead?

Done

> 3.
> + /* remember the command ID and snapshot for the streaming run */
> + txn->command_id = command_id;
> +
> + /* Avoid copying if it's already copied. */
> + if (snapshot_now->copied)
> + txn->snapshot_now = snapshot_now;
> + else
> + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
> + txn, command_id);
>
> This code is used at two different places, can we try to keep this in
> a single function.

Done

> 4.
> In ReorderBufferProcessTXN(), the patch is calling stream_stop in both
> the try and catch block. If there is an error after calling it in a
> try block, we might call it again via catch. I think that will lead
> to sending a stop message twice. Won't that be a problem? See the
> usage of iterstate in the catch block, we have made it safe from a
> similar problem.

IMHO, we don't need that, because we only call stream_stop in the
catch block if the error type is ERRCODE_TRANSACTION_ROLLBACK. So if
in TRY block we have already stopped the stream then we should not get
that error. I have added the comments for the same.

> 5.
> + if (streaming)
> + {
> + /* Discard the changes that we just streamed. */
> + ReorderBufferTruncateTXN(rb, txn);
>
> - PG_RE_THROW();
> + /* Re-throw only if it's not an abort. */
> + if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK)
> + {
> + MemoryContextSwitchTo(ecxt);
> + PG_RE_THROW();
> + }
> + else
> + {
> + FlushErrorState();
> + FreeErrorData(errdata);
> + errdata = NULL;
> +
>
> I think here we can write few comments on why we are doing error-code
> specific handling, basically, explain a bit about concurrent abort
> handling and or refer to the part of comments where it is explained.

Done

> 6.
> PG_CATCH();
> {
> + MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
> + ErrorData *errdata = CopyErrorData();
>
> I don't understand the usage of memory context in this part of the
> code. Basically, you are switching to CurrentMemoryContext here, do
> some error handling and then again reset back to some random context
> before rethrowing the error. If there is some purpose for it, then it
> might be better if you can write a few comments to explain the same.

Basically, the ccxt is the CurrentMemoryContext when we started the
streaming and ecxt it the context when we catch the error. So
ideally, before this change, it will rethrow in the context when we
catch the error i.e. ecxt. So what we are trying to do is put it back
to normal context (ccxt) and copy the error data in the normal
context. And, if we are not handling it gracefully then put it back
to the context it was in, and rethrow.

>
> 7.
> +ReorderBufferCommit()
> {
> ..
> + /*
> + * If the transaction was (partially) streamed, we need to commit it in a
> + * 'streamed' way. That is, we first stream the remaining part of the
> + * transaction, and then invoke stream_commit message.
> + *
> + * XXX Called after everything (origin ID and LSN, ...) is stored in the
> + * transaction, so we don't pass that directly.
> + *
> + * XXX Somewhat hackish redirection, perhaps needs to be refactored?
> + */
> + if (rbtxn_is_streamed(txn))
> + {
> + ReorderBufferStreamCommit(rb, txn);
> + return;
> + }
> +
> ..
> }
>
> "XXX Somewhat hackish redirection, perhaps needs to be refactored?"
> What kind of refactoring we can do here? To me, it looks okay.

I think it looks fine to me also. So I have removed this comment.

> 8.
> @@ -2295,6 +2677,13 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer
> *rb, TransactionId xid,
> txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
>
> txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
> +
> + /*
> + * TOCHECK: Mark toplevel transaction as having catalog changes too
> + * if one of its children has.
> + */
> + if (txn->toptxn != NULL)
> + txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
> }
>
> Why are we marking top transaction here?

We need to mark top transaction to decide whether to build tuplecid
hash or not. In non-streaming mode, we are only sending during the
commit time, and during commit time we know whether the top
transaction has any catalog changes or not based on the invalidation
message so we are marking the top transaction there in DecodeCommit.
Since here we are not waiting till commit so we need to mark the top
transaction as soon as we mark any of its child transactions.

[1] https://www.postgresql.org/message-id/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachment Content-Type Size
v22-0001-Immediately-WAL-log-assignments.patch application/octet-stream 10.6 KB
v22-0004-Gracefully-handle-concurrent-aborts-of-uncommitt.patch application/octet-stream 13.3 KB
v22-0002-Issue-individual-invalidations-with-wal_level-lo.patch application/octet-stream 16.7 KB
v22-0003-Extend-the-output-plugin-API-with-stream-methods.patch application/octet-stream 35.2 KB
v22-0005-Implement-streaming-mode-in-ReorderBuffer.patch application/octet-stream 37.2 KB
v22-0008-Enable-streaming-for-all-subscription-TAP-tests.patch application/octet-stream 14.8 KB
v22-0006-Add-support-for-streaming-to-built-in-replicatio.patch application/octet-stream 90.1 KB
v22-0009-Add-TAP-test-for-streaming-vs.-DDL.patch application/octet-stream 4.4 KB
v22-0007-Track-statistics-for-streaming.patch application/octet-stream 12.0 KB
v22-0010-Bugfix-handling-of-incomplete-toast-tuple.patch application/octet-stream 16.0 KB
v22-0011-Provide-new-api-to-get-the-streaming-changes.patch application/octet-stream 6.3 KB
v22-0012-Add-streaming-option-in-pg_dump.patch application/octet-stream 2.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Jürgen Purtz 2020-05-17 08:09:48 Re: Add A Glossary
Previous Message Alvaro Herrera 2020-05-17 07:02:49 Re: [HACKERS] Restricting maximum keep segments by repslots