Hello Ajin. I have gone through the v6 patch changes and have a list of review comments below. Apologies for the length of this email - I know that many of the following comments are trivial, but I figured I should either just ignore everything cosmetic, or list everything regardless. I chose the latter. There may be some duplication where the same review comment is written for multiple files and/or where the same file is in your multiple patches. Kind Regards. Peter Smith Fujitsu Australia [BEGIN] ========== Patch V6-0001, File: contrib/test_decoding/expected/prepared.out (so prepared.sql also) ========== COMMENT Line 30 - The INSERT INTO test_prepared1 VALUES (2); is kind of strange because it is not really part of the prior test nor the following test. Maybe it would be better to have a comment describing the purpose of this isolated INSERT and to also consume the data from the slot so it does not get jumbled with the data of the following (abort) test. ; COMMENT Line 53 - Same comment for this test INSERT INTO test_prepared1 VALUES (4); It kind of has nothing really to do with either the prior (abort) test nor the following (ddl) test. ; COMMENT Line 60 - Seems to check which locks are held for the test_prepared_1 table while the transaction is in progress. Maybe it would be better to have more comments describing what is expected here and why. ; COMMENT Line 88 - There is a comment in the test saying "-- We should see '7' before '5' in our results since it commits first." but I did not see any test code that actually verifies that happens. ; QUESTION Line 120 - I did not really understand the SQL checking the pg_class. I expected this would be checking table 'test_prepared1' instead. Can you explain it? SELECT 'pg_class' AS relation, locktype, mode FROM pg_locks WHERE locktype = 'relation' AND relation = 'pg_class'::regclass; relation | locktype | mode ----------+----------+------ (0 rows) ; QUESTION Line 139 - SET statement_timeout = '1s'; is 1 seconds short enough here for this test, or might it be that these statements would be completed in less than one seconds anyhow? ; QUESTION Line 163 - How is this testing a SAVEPOINT? Or is it only to check that the SAVEPOINT command is not part of the replicated changes? ; COMMENT Line 175 - Missing underscore in comment. Code requires also underscore: "nodecode" --> "_nodecode" ========== Patch V6-0001, File: contrib/test_decoding/test_decoding.c ========== COMMENT Line 43 @@ -36,6 +40,7 @@ typedef struct bool skip_empty_xacts; bool xact_wrote_changes; bool only_local; + TransactionId check_xid; /* track abort of this txid */ } TestDecodingData; The "check_xid" seems a meaningless name. Check what? IIUC maybe should be something like "check_xid_aborted" ; COMMENT Line 105 @ -88,6 +93,19 @@ static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, Remove extra blank line after these functions ; COMMENT Line 149 @@ -116,6 +134,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; cb->stream_truncate_cb = pg_decode_stream_truncate; + cb->filter_prepare_cb = pg_decode_filter_prepare; + cb->prepare_cb = pg_decode_prepare_txn; + cb->commit_prepared_cb = pg_decode_commit_prepared_txn; + cb->abort_prepared_cb = pg_decode_abort_prepared_txn; + } There is a confusing mix of terminology where sometimes things are referred as ROLLBACK/rollback and other times apparently the same operation is referred as ABORT/abort. I do not know the root cause of this mixture. IIUC maybe the internal functions and protocol generally use the term "abort", whereas the SQL syntax is "ROLLBACK"... but where those two terms collide in the middle it gets quite confusing. At least I thought the names of the "callbacks" which get exposed to the user (e.g. in the help) might be better if they would match the SQL. "abort_prepared_cb" --> "rollback_prepared_db" There are similar review comments like this below where the alternating terms caused me some confusion. ~ Also, Remove the extra blank line before the end of the function. ; COMMENT Line 267 @ -227,6 +252,42 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "two-phase-commit") == 0) + { + if (elem->arg == NULL) + continue; IMO the "check-xid" code might be better rearranged so the NULL check is first instead of if/else. e.g. if (elem->arg == NULL) ereport(FATAL, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("check-xid needs an input value"))); ~ Also, is it really supposed to be FATAL instead or ERROR. That is not the same as the other surrounding code. ; COMMENT Line 296 if (data->check_xid <= 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Specify positive value for parameter \"%s\"," " you specified \"%s\"", elem->defname, strVal(elem->arg)))); The code checking for <= 0 seems over-complicated. Because conversion was using strtoul() I fail to see how this can ever be < 0. Wouldn't it be easier to simply test the result of the strtoul() function? BEFORE: if (errno == EINVAL || errno == ERANGE) AFTER: if (data->check_xid == 0) ~ Also, should this be FATAL? Everything else similar is ERROR. ; COMMENT (general) I don't recall seeing any of these decoding options (e.g. "two-phase-commit", "check-xid") documented anywhere. So how can a user even know these options exist so they can use them? Perhaps options should be described on this page? https://www.postgresql.org/docs/13/functions-admin.html#FUNCTIONS-REPLICATION ; COMMENT (general) "check-xid" is a meaningless option name. Maybe something like "checked-xid-aborted" is more useful? Suggest changing the member, the option, and the error messages to match some better name. ; COMMENT Line 314 @@ -238,6 +299,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } ctx->streaming &= enable_streaming; + ctx->enable_twophase &= enable_2pc; } The "ctx->enable_twophase" is inconsistent naming with the "ctx->streaming" member. "enable_twophase" --> "twophase" ; COMMENT Line 374 @@ -297,6 +359,94 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } + +/* + * Filter out two-phase transactions. + * + * Each plugin can implement its own filtering logic. Here + * we demonstrate a simple logic by checking the GID. If the + * GID contains the "_nodecode" substring, then we filter + * it out. + */ +static bool +pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Remove the extra preceding blank line. ~ I did not find anything in the help about "_nodecode". Should it be there or is this deliberately not documented feature? ; QUESTION Line 440 +pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Is this a wrong comment "ABORT PREPARED" --> "ROLLBACK PREPARED" ?? ; COMMENT Line 620 @@ -455,6 +605,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } data->xact_wrote_changes = true; + /* if check_xid is specified */ + if (TransactionIdIsValid(data->check_xid)) + { + elog(LOG, "waiting for %u to abort", data->check_xid); + while (TransactionIdIsInProgress(dat The check_xid seems a meaningless name, and the comment "/* if check_xid is specified */" was not helpful either. IIUC purpose of this is to check that the nominated xid always is rolled back. So the appropriate name may be more like "check-xid-aborted". ; ========== Patch V6-0001, File: doc/src/sgml/logicaldecoding.sgml ========== COMMENT/QUESTION Section 48.6.1 @ -387,6 +387,10 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; Confused by the mixing of terminologies "abort" and "rollback". Why is it LogicalDecodeAbortPreparedCB instead of LogicalDecodeRollbackPreparedCB? Why is it abort_prepared_cb instead of rollback_prepared_cb;? I thought everything the user sees should be ROLLBACK/rollback (like the SQL) regardless of what the internal functions might be called. ; COMMENT Section 48.6.1 The begin_cb, change_cb and commit_cb callbacks are required, while startup_cb, filter_by_origin_cb, truncate_cb, and shutdown_cb are optional. If truncate_cb is not set but a TRUNCATE is to be decoded, the action will be ignored. The 1st paragraph beneath the typedef does not mention the newly added callbacks to say if they are required or optional. ; COMMENT Section 48.6.4.5 Section 48.6.4.6 Section 48.6.4.7 @@ -578,6 +588,55 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, + + + + The wording and titles are a bit backwards compared to the others. e.g. previously was "Transaction Begin" (not "Begin Transaction") and "Transaction End" (not "End Transaction"). So for consistently following the existing IMO should change these new titles (and wording) to: - "Commit Prepared Transaction Callback" --> "Transaction Commit Prepared Callback" - "Rollback Prepared Transaction Callback" --> "Transaction Rollback Prepared Callback" - "whenever a commit prepared transaction has been decoded" --> "whenever a transaction commit prepared has been decoded" - "whenever a rollback prepared transaction has been decoded." --> "whenever a transaction rollback prepared has been decoded." ; ========== Patch V6-0001, File: src/backend/replication/logical/decode.c ========== COMMENT Line 74 @@ -70,6 +70,9 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare * parsed); The 2nd line of DecodePrepare is misaligned by one space. ; COMMENT Line 321 @@ -312,17 +315,34 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } break; case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; + xl_xact_prepare *xlrec; + /* check that output plugin is capable of twophase decoding */ "twophase" --> "two-phase" ~ Also, add a blank line after the declarations. ; ========== Patch V6-0001, File: src/backend/replication/logical/logical.c ========== COMMENT Line 249 @@ -225,6 +237,19 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_message_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); + /* + * To support two phase logical decoding, we require prepare/commit-prepare/abort-prepare + * callbacks. The filter-prepare callback is optional. We however enable two phase logical + * decoding when at least one of the methods is enabled so that we can easily identify + * missing methods. The terminology is generally well known as "two-phase" (with the hyphen) https://en.wikipedia.org/wiki/Two-phase_commit_protocol so let's be consistent for all the patch code comments. Please search the code and correct this in all places, even where I might have missed to identify it. "two phase" --> "two-phase" ; COMMENT Line 822 @@ -782,6 +807,111 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) "support 2 phase" --> "supports two-phase" in the comment ; COMMENT Line 844 Code condition seems strange and/or broken. if (ctx->enable_twophase && ctx->callbacks.prepare_cb == NULL) Because if the flag is null then this condition is skipped. But then if the callback was also NULL then attempting to call it to "do the actual work" will give NPE. ~ Also, I wonder should this check be the first thing in this function? Because if it fails does it even make sense that all the errcallback code was set up? E.g errcallback.arg potentially is left pointing to a stack variable on a stack that no longer exists. ; COMMENT Line 857 +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, "support 2 phase" --> "supports two-phase" in the comment ~ Also, Same potential trouble with the condition: if (ctx->enable_twophase && ctx->callbacks.commit_prepared_cb == NULL) Same as previously asked. Should this check be first thing in this function? ; COMMENT Line 892 +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, "support 2 phase" --> "supports two-phase" in the comment ~ Same potential trouble with the condition: if (ctx->enable_twophase && ctx->callbacks.abort_prepared_cb == NULL) Same as previously asked. Should this check be the first thing in this function? ; COMMENT Line 1013 @@ -858,6 +988,51 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static bool +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) Fix wording in comment: "twophase" --> "two-phase transactions" "twophase transactions" --> "two-phase transactions" ========== Patch V6-0001, File: src/backend/replication/logical/reorderbuffer.c ========== COMMENT Line 255 @@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); -static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool txn_prepared); The alignment is inconsistent. One more space needed before "bool txn_prepared" ; COMMENT Line 417 @@ -413,6 +414,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* free data that's contained */ + if (txn->gid != NULL) + { + pfree(txn->gid); + txn->gid = NULL; + } Should add the blank link before this new code, as it was before. ; COMMENT Line 1564 @ -1502,12 +1561,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Discard changes from a transaction (and subtransactions), after streaming - * them. Keep the remaining info - transactions, tuplecids, invalidations and - * snapshots. + * Discard changes from a transaction (and subtransactions), either after streaming or + * after a PREPARE. typo "snapshots.If" -> "snapshots. If" ; COMMENT/QUESTION Line 1590 @@ -1526,7 +1587,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); } There are some code paths here I did not understand how they match the comments. Because this function is recursive it seems that it may be called where the 2nd parameter txn is a sub-transaction. But then this seems at odds with some of the other code comments of this function which are processing the txn without ever testing is it really toplevel or not: e.g. Line 1593 "/* cleanup changes in the toplevel txn */" e.g. Line 1632 "They are always stored in the toplevel transaction." ; COMMENT Line 1644 @@ -1560,9 +1621,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn->toptxn) || (txn->nentries_mem != 0)) + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; + if (txn_prepared) /* remove the change from it's containing list */ typo "it's" --> "its" ; QUESTION Line 1977 @@ -1880,7 +1965,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn); + ReorderBufferTruncateTXN(rb, txn, false); How do you know the 3rd parameter - i.e. txn_prepared - should be hardwired false here? e.g. I thought that maybe rbtxn_prepared(txn) can be true here. ; COMMENT Line 2345 @@ -2249,7 +2334,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } } - /* Looks like accidental blank line deletion. This should be put back how it was ; COMMENT/QUESTION Line 2374 @@ -2278,7 +2362,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } else - rb->commit(rb, txn, commit_lsn); + { + /* + * Call either PREPARE (for twophase transactions) or COMMIT + * (for regular ones). "twophase" --> "two-phase" ~ Also, I was confused by the apparent assumption of exclusiveness of streaming and 2PC... e.g. what if streaming AND 2PC then it won't do rb->prepare() ; QUESTION Line 2424 @@ -2319,11 +2412,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (streaming) { - ReorderBufferTruncateTXN(rb, txn); + ReorderBufferTruncateTXN(rb, txn, false); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } + else if (rbtxn_prepared(txn)) I was confused by the exclusiveness of streaming/2PC. e.g. what if streaming AND 2PC at same time - how can you pass false as 3rd param to ReorderBufferTruncateTXN? ; COMMENT Line 2463 @@ -2352,17 +2451,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent - * abort of the (sub)transaction we are streaming. We need to do the + * abort of the (sub)transaction we are streaming or preparing. We need to do the * cleanup and return gracefully on this error, see SetupCheckXidLive. */ "twoi phase" --> "two-phase" ; QUESTIONS Line 2482 @@ -2370,10 +2470,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, errdata = NULL; curtxn->concurrent_abort = true; - /* Reset the TXN so that it is allowed to stream remaining data. */ - ReorderBufferResetTXN(rb, txn, snapshot_now, - command_id, prev_lsn, - specinsert); + /* If streaming, reset the TXN so that it is allowed to stream remaining data. */ + if (streaming) Re: /* If streaming, reset the TXN so that it is allowed to stream remaining data. */ I was confused by the exclusiveness of streaming/2PC. Is it not possible for streaming flags and rbtxn_prepared(txn) true at the same time? ~ elog(LOG, "stopping decoding of %s (%u)", txn->gid[0] != '\0'? txn->gid:"", txn->xid); Is this a safe operation, or do you also need to test txn->gid is not NULL? ; COMMENT Line 2606 +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, "twophase" --> "two-phase" ; QUESTION Line 2655 +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, "This is used to handle COMMIT/ABORT PREPARED" Should that say "COMMIT/ROLLBACK PREPARED"? ; COMMENT Line 2668 "Anyways, 2PC transactions" --> "Anyway, two-phase transactions" ; COMMENT Line 2765 @@ -2495,7 +2731,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* cosmetic... */ txn->final_lsn = lsn; - /* remove potential on-disk data, and deallocate */ + /* + * remove potential on-disk data, and deallocate. + * Remove the blank between the comment and code. ========== Patch V6-0001, File: src/include/replication/logical.h ========== COMMENT Line 89 "two phase" -> "two-phase" ; COMMENT Line 89 For consistency with the previous member naming really the new member should just be called "twophase" rather than "enable_twophase" ; ========== Patch V6-0001, File: src/include/replication/output_plugin.h ========== QUESTION Line 106 As previously asked, why is the callback function/typedef referred as AbortPrepared instead of RollbackPrepared? It does not match the SQL and the function comment, and seems only to add some unnecessary confusion. ; ========== Patch V6-0001, File: src/include/replication/reorderbuffer.h ========== QUESTION Line 116 @@ -162,9 +163,13 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 -#define RBTXN_IS_STREAMED 0x0008 -#define RBTXN_HAS_TOAST_INSERT 0x0010 -#define RBTXN_HAS_SPEC_INSERT 0x0020 +#define RBTXN_PREPARE 0x0008 +#define RBTXN_COMMIT_PREPARED 0x0010 +#define RBTXN_ROLLBACK_PREPARED 0x0020 +#define RBTXN_COMMIT 0x0040 +#define RBTXN_IS_STREAMED 0x0080 +#define RBTXN_HAS_TOAST_INSERT 0x0100 +#define RBTXN_HAS_SPEC_INSERT 0x0200 I was wondering why when adding new flags, some of the existing flag masks were also altered. I am assuming this is ok because they are never persisted but are only used in the protocol (??) ; COMMENT Line 226 @@ -218,6 +223,15 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ ) +/* is this txn prepared? */ +#define rbtxn_prepared(txn) (txn->txn_flags & RBTXN_PREPARE) +/* was this prepared txn committed in the meanwhile? */ +#define rbtxn_commit_prepared(txn) (txn->txn_flags & RBTXN_COMMIT_PREPARED) +/* was this prepared txn aborted in the meanwhile? */ +#define rbtxn_rollback_prepared(txn) (txn->txn_flags & RBTXN_ROLLBACK_PREPARED) +/* was this txn committed in the meanwhile? */ +#define rbtxn_commit(txn) (txn->txn_flags & RBTXN_COMMIT) + Probably all the "txn->txn_flags" here might be more safely written with parentheses in the macro like "(txn)->txn_flags". ~ Also, Start all comments with capital. And what is the meaning "in the meanwhile?" ; COMMENT Line 410 @@ -390,6 +407,39 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); The format is inconsistent with all other callback signatures here, where the 1st arg was on the same line as the typedef. ; COMMENT Line 440-442 Excessive blank lines following this change? ; COMMENT Line 638 @@ -571,6 +631,15 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, + const char *gid); +bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid); Not aligned consistently with other function prototypes. ; ========== Patch V6-0003, File: src/backend/access/transam/twophase.c ========== COMMENT Line 551 @@ -548,6 +548,37 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held) } /* + * LookupGXact + * Check if the prepared transaction with the given GID is around + */ +bool +LookupGXact(const char *gid) There is potential to refactor/simplify this code: e.g. bool LookupGXact(const char *gid) { int i; bool found = false; LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; /* Ignore not-yet-valid GIDs */ if (gxact->valid && strcmp(gxact->gid, gid) == 0) { found = true; break; } } LWLockRelease(TwoPhaseStateLock); return found; } ; ========== Patch V6-0003, File: src/backend/replication/logical/proto.c ========== COMMENT Line 86 @@ -72,12 +72,17 @@ logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data) */ void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) Since now the flags are used the code comment is wrong. "/* send the flags field (unused for now) */" ; COMMENT Line 129 @ -106,6 +115,77 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) } /* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, "2PC transactions" --> "two-phase commit transactions" ; COMMENT Line 133 Assert(strlen(txn->gid) > 0); Shouldn't that assertion also check txn->gid is not NULL (to prevent NPE in case gid was NULL) ; COMMENT Line 177 +logicalrep_read_prepare(StringInfo in, LogicalRepPrepareData * prepare_data) prepare_data->prepare_type = flags; This code may be OK but it does seem a bit of an abuse of the flags. e.g. Are they flags or are the really enum values? e.g. And if they are effectively enums (it appears they are) then seemed inconsistent that |= was used when they were previously assigned. ; ========== Patch V6-0003, File: src/backend/replication/logical/worker.c ========== COMMENT Line 757 @@ -749,6 +753,141 @@ apply_handle_commit(StringInfo s) pgstat_report_activity(STATE_IDLE, NULL); } +static void +apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data) +{ + Assert(prepare_data->prepare_lsn == remote_final_lsn); Missing function comment to say this is called from apply_handle_prepare. ; COMMENT Line 798 +apply_handle_commit_prepared_txn(LogicalRepPrepareData * prepare_data) Missing function comment to say this is called from apply_handle_prepare. ; COMMENT Line 824 +apply_handle_rollback_prepared_txn(LogicalRepPrepareData * prepare_data) Missing function comment to say this is called from apply_handle_prepare. ========== Patch V6-0003, File: src/backend/replication/pgoutput/pgoutput.c ========== COMMENT Line 50 @@ -47,6 +47,12 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); The parameter indentation (2nd lines) does not match everything else in this context. ; COMMENT Line 152 @@ -143,6 +149,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; + + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->abort_prepared_cb = pgoutput_abort_prepared_txn; Remove the unnecessary blank line. ; QUESTION Line 386 @@ -373,7 +383,49 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginUpdateProgress(ctx); OutputPluginPrepareWrite(ctx, true); - logicalrep_write_commit(ctx->out, txn, commit_lsn); + logicalrep_write_commit(ctx->out, txn, commit_lsn, true); Is the is_commit parameter of logicalrep_write_commit ever passed as false? If yes, where? If no, the what is the point of it? ; COMMENT Line 408 +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Since all this function is identical to pg_output_prepare it might be better to either 1. just leave this as a wrapper to delegate to that function 2. remove this one entirely and assign the callback to the common pgoutput_prepare_txn ; COMMENT Line 419 +pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Since all this function is identical to pg_output_prepare if might be better to either 1. just leave this as a wrapper to delegate to that function 2. remove this one entirely and assign the callback to the common pgoutput_prepare_tx ; COMMENT Line 419 +pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Shouldn't this comment say be "ROLLBACK PREPARED"? ; ========== Patch V6-0003, File: src/include/replication/logicalproto.h ========== QUESTION Line 101 @@ -87,20 +87,55 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +/* Commit (and abort) information */ #define LOGICALREP_IS_ABORT 0x02 Is there a good reason why this is not called: #define LOGICALREP_IS_ROLLBACK 0x02 ; COMMENT Line 105 ((flags == LOGICALREP_IS_COMMIT) || (flags == LOGICALREP_IS_ABORT)) Macros would be safer if flags are in parentheses (((flags) == LOGICALREP_IS_COMMIT) || ((flags) == LOGICALREP_IS_ABORT)) ; COMMENT Line 115 Unexpected whitespace for the typedef "} LogicalRepPrepareData;" ; COMMENT Line 122 /* prepare can be exactly one of PREPARE, [COMMIT|ABORT] PREPARED*/ #define PrepareFlagsAreValid(flags) \ ((flags == LOGICALREP_IS_PREPARE) || \ (flags == LOGICALREP_IS_COMMIT_PREPARED) || \ (flags == LOGICALREP_IS_ROLLBACK_PREPARED)) There is confusing mixture in macros and comments of ABORT and ROLLBACK terms "[COMMIT|ABORT] PREPARED" --> "[COMMIT|ROLLBACK] PREPARED" ~ Also, it would be safer if flags are in parentheses (((flags) == LOGICALREP_IS_PREPARE) || \ ((flags) == LOGICALREP_IS_COMMIT_PREPARED) || \ ((flags) == LOGICALREP_IS_ROLLBACK_PREPARED)) ; ========== Patch V6-0003, File: src/test/subscription/t/020_twophase.pl ========== COMMENT Line 131 - # check inserts are visible Isn't this supposed to be checking for rows 12 and 13, instead of 11 and 12? ; ========== Patch V6-0004, File: contrib/test_decoding/test_decoding.c ========== COMMENT Line 81 @@ -78,6 +78,15 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx, static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static All these functions have a 3rd parameter called commit_lsn. Even though the functions are not commit related. It seems like a cut/paste error. ; COMMENT Line 142 @@ -130,6 +139,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_start_cb = pg_decode_stream_start; cb->stream_stop_cb = pg_decode_stream_stop; cb->stream_abort_cb = pg_decode_stream_abort; + cb->stream_prepare_cb = pg_decode_stream_prepare; + cb->stream_commit_prepared_cb = pg_decode_stream_commit_prepared; + cb->stream_abort_prepared_cb = pg_decode_stream_abort_prepared; cb->stream_commit_cb = pg_decode_stream_commit; Can the "cb->stream_abort_prepared_cb" be changed to "cb->stream_rollback_prepared_cb"? ; COMMENT Line 827 @@ -812,6 +824,78 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, } static void +pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_pr The commit_lsn (3rd parameter) is unused and seems like a cut/paste name error. ; COMMENT Line 875 +pg_decode_stream_abort_prepared(LogicalDecodingContext *ctx, The commit_lsn (3rd parameter) is unused and seems like a cut/paste name error. ; ========== Patch V6-0004, File: doc/src/sgml/logicaldecoding.sgml ========== COMMENT 48.6.1 @@ -396,6 +396,9 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; + LogicalDecodeStreamCommitPreparedCB stream_commit_prepared_cb; + LogicalDecodeStreamAbortPreparedCB stream_abort_prepared_cb; Same question from previous review comments - why using the terminology "abort" instead of "rollback" ; COMMENT 48.6.1 @@ -418,7 +421,9 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); in-progress transactions. The stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb and stream_change_cb - are required, while stream_message_cb and + are required, while stream_message_cb, + stream_prepare_cb, stream_commit_prepared_cb, + stream_abort_prepared_cb, Missing "and". ... "stream_abort_prepared_cb, stream_truncate_cb are optional." --> "stream_abort_prepared_cb, and stream_truncate_cb are optional." ; COMMENT Section 48.6.4.16 Section 48.6.4.17 Section 48.6.4.18 @@ -839,6 +844,45 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + + Stream Prepare Callback + + The stream_prepare_cb callback is called to prepare + a previously streamed transaction as part of a two phase commit. + +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + + + Stream Commit Prepared Callback + + The stream_commit_prepared_cb callback is called to commit prepared + a previously streamed transaction as part of a two phase commit. + +typedef void (*LogicalDecodeStreamCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + + + Stream Abort Prepared Callback + + The stream_abort_prepared_cb callback is called to abort prepared + a previously streamed transaction as part of a two phase commit. + +typedef void (*LogicalDecodeStreamAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + 1. Everywhere it says "two phase" commit should be consistently replaced to say "two-phase" commit (with the hyphen) 2. Search for "abort_lsn" parameter. It seems to be overused (cut/paste error) even when the API is unrelated to abort 3. 48.6.4.17 and 48.6.4.18 Is this wording ok? Is the word "prepared" even necessary here? - "... called to commit prepared a previously streamed transaction ..." - "... called to abort prepared a previously streamed transaction ..." ; COMMENT Section 48.9 @@ -1017,9 +1061,13 @@ OutputPluginWrite(ctx, true); When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by stream_start_cb and stream_stop_cb callbacks. Once all the decoded - changes are transmitted, the transaction is committed using the - stream_commit_cb callback (or possibly aborted using - the stream_abort_cb callback). + changes are transmitted, the transaction can be committed using the + the stream_commit_cb callback "two phase" --> "two-phase" ~ Also, Missing period on end of sentence. "or aborted using the stream_abort_prepared_cb" --> "or aborted using the stream_abort_prepared_cb." ; ========== Patch V6-0004, File: src/backend/replication/logical/logical.c ========== COMMENT Line 84 @@ -81,6 +81,12 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr last_lsn); static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void stream_abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); The 3rd parameter is always "commit_lsn" even for API unrelated to commit, so seems like cut/paste error. ; COMMENT Line 1246 @@ -1231,6 +1243,105 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; Misnamed parameter "commit_lsn" ? ~ Also, Line 1272 There seem to be some missing integrity checking to make sure the callback is not NULL. A null callback will give NPE when wrapper attempts to call it ; COMMENT Line 1305 +static void +stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, There seem to be some missing integrity checking to make sure the callback is not NULL. A null callback will give NPE when wrapper attempts to call it. ; COMMENT Line 1312 +static void +stream_abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Misnamed parameter "commit_lsn" ? ~ Also, Line 1338 There seem to be some missing integrity checking to make sure the callback is not NULL. A null callback will give NPE when wrapper attempts to call it. ========== Patch V6-0004, File: src/backend/replication/logical/reorderbuffer.c ========== COMMENT Line 2684 @@ -2672,15 +2681,31 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ strcpy(txn->gid, gid); - if (is_commit) + if (rbtxn_is_streamed(txn)) { - txn->txn_flags |= RBTXN_COMMIT_PREPARED; - rb->commit_prepared(rb, txn, commit_lsn); + if (is_commit) + { + txn->txn_flags |= RBTXN_COMMIT_PREPARED; The setting/checking of the flags could be refactored if you wanted to write less code: e.g. if (is_commit) txn->txn_flags |= RBTXN_COMMIT_PREPARED; else txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; if (rbtxn_is_streamed(txn) && rbtxn_commit_prepared(txn)) rb->stream_commit_prepared(rb, txn, commit_lsn); else if (rbtxn_is_streamed(txn) && rbtxn_rollback_prepared(txn)) rb->stream_abort_prepared(rb, txn, commit_lsn); else if (rbtxn_commit_prepared(txn)) rb->commit_prepared(rb, txn, commit_lsn); else if (rbtxn_rollback_prepared(txn)) rb->abort_prepared(rb, txn, commit_lsn); ; ========== Patch V6-0004, File: src/include/replication/output_plugin.h ========== COMMENT Line 171 @@ -157,6 +157,33 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn); /* + * Called to prepare changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit and only when + * two-phased commits are supported + */ 1. Missing period all these comments. 2. Is the part that says "and only where two-phased commits are supported" necessary to say? Is seems redundant since comments already says called as part of a two-phase commit. ; ========== Patch V6-0004, File: src/include/replication/reorderbuffer.h ========== COMMENT Line 467 @@ -466,6 +466,24 @@ typedef void (*ReorderBufferStreamAbortCB) ( ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamCommitPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamAbortPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); Cut/paste error - repeated same comment 3 times? [END]