From 97a249ab66884d23780790caf18cbe03cd94f3fc Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Mon, 22 Sep 2025 14:47:48 +0200 Subject: [PATCH 06/23] C11 anonymous unions [reorderbuffer data] --- contrib/test_decoding/test_decoding.c | 24 +- src/backend/replication/logical/decode.c | 54 ++-- .../replication/logical/reorderbuffer.c | 254 +++++++++--------- src/backend/replication/pgoutput/pgoutput.c | 14 +- src/include/replication/reorderbuffer.h | 2 +- 5 files changed, 174 insertions(+), 174 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 36e77c69e1c..1723651b770 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -639,41 +639,41 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: appendStringInfoString(ctx->out, " INSERT:"); - if (change->data.tp.newtuple == NULL) + if (change->tp.newtuple == NULL) appendStringInfoString(ctx->out, " (no-tuple-data)"); else tuple_to_stringinfo(ctx->out, tupdesc, - change->data.tp.newtuple, + change->tp.newtuple, false); break; case REORDER_BUFFER_CHANGE_UPDATE: appendStringInfoString(ctx->out, " UPDATE:"); - if (change->data.tp.oldtuple != NULL) + if (change->tp.oldtuple != NULL) { appendStringInfoString(ctx->out, " old-key:"); tuple_to_stringinfo(ctx->out, tupdesc, - change->data.tp.oldtuple, + change->tp.oldtuple, true); appendStringInfoString(ctx->out, " new-tuple:"); } - if (change->data.tp.newtuple == NULL) + if (change->tp.newtuple == NULL) appendStringInfoString(ctx->out, " (no-tuple-data)"); else tuple_to_stringinfo(ctx->out, tupdesc, - change->data.tp.newtuple, + change->tp.newtuple, false); break; case REORDER_BUFFER_CHANGE_DELETE: appendStringInfoString(ctx->out, " DELETE:"); /* if there was no PK, we only know that a delete happened */ - if (change->data.tp.oldtuple == NULL) + if (change->tp.oldtuple == NULL) appendStringInfoString(ctx->out, " (no-tuple-data)"); /* In DELETE, only the replica identity is present; display that */ else tuple_to_stringinfo(ctx->out, tupdesc, - change->data.tp.oldtuple, + change->tp.oldtuple, true); break; default: @@ -724,12 +724,12 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, appendStringInfoString(ctx->out, ": TRUNCATE:"); - if (change->data.truncate.restart_seqs - || change->data.truncate.cascade) + if (change->truncate.restart_seqs + || change->truncate.cascade) { - if (change->data.truncate.restart_seqs) + if (change->truncate.restart_seqs) appendStringInfoString(ctx->out, " restart_seqs"); - if (change->data.truncate.cascade) + if (change->truncate.cascade) appendStringInfoString(ctx->out, " cascade"); } else diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index cc03f0706e9..f7773e38dda 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -935,17 +935,17 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator)); + memcpy(&change->tp.rlocator, &target_locator, sizeof(RelFileLocator)); tupledata = XLogRecGetBlockData(r, 0, &datalen); tuplelen = datalen - SizeOfHeapHeader; - change->data.tp.newtuple = + change->tp.newtuple = ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen); - DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); + DecodeXLogTuple(tupledata, datalen, change->tp.newtuple); - change->data.tp.clear_toast_afterwards = true; + change->tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, @@ -981,7 +981,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferAllocChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UPDATE; change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator)); + memcpy(&change->tp.rlocator, &target_locator, sizeof(RelFileLocator)); if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE) { @@ -992,10 +992,10 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) tuplelen = datalen - SizeOfHeapHeader; - change->data.tp.newtuple = + change->tp.newtuple = ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen); - DecodeXLogTuple(data, datalen, change->data.tp.newtuple); + DecodeXLogTuple(data, datalen, change->tp.newtuple); } if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD) @@ -1008,13 +1008,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; tuplelen = datalen - SizeOfHeapHeader; - change->data.tp.oldtuple = + change->tp.oldtuple = ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen); - DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); + DecodeXLogTuple(data, datalen, change->tp.oldtuple); } - change->data.tp.clear_toast_afterwards = true; + change->tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); @@ -1053,7 +1053,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator)); + memcpy(&change->tp.rlocator, &target_locator, sizeof(RelFileLocator)); /* old primary key stored */ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) @@ -1063,14 +1063,14 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); - change->data.tp.oldtuple = + change->tp.oldtuple = ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - datalen, change->data.tp.oldtuple); + datalen, change->tp.oldtuple); } - change->data.tp.clear_toast_afterwards = true; + change->tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); @@ -1100,13 +1100,13 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->action = REORDER_BUFFER_CHANGE_TRUNCATE; change->origin_id = XLogRecGetOrigin(r); if (xlrec->flags & XLH_TRUNCATE_CASCADE) - change->data.truncate.cascade = true; + change->truncate.cascade = true; if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS) - change->data.truncate.restart_seqs = true; - change->data.truncate.nrelids = xlrec->nrelids; - change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder, - xlrec->nrelids); - memcpy(change->data.truncate.relids, xlrec->relids, + change->truncate.restart_seqs = true; + change->truncate.nrelids = xlrec->nrelids; + change->truncate.relids = ReorderBufferAllocRelids(ctx->reorder, + xlrec->nrelids); + memcpy(change->truncate.relids, xlrec->relids, xlrec->nrelids * sizeof(Oid)); ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); @@ -1166,16 +1166,16 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->action = REORDER_BUFFER_CHANGE_INSERT; change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator)); + memcpy(&change->tp.rlocator, &rlocator, sizeof(RelFileLocator)); xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); data = ((char *) xlhdr) + SizeOfMultiInsertTuple; datalen = xlhdr->datalen; - change->data.tp.newtuple = + change->tp.newtuple = ReorderBufferAllocTupleBuf(ctx->reorder, datalen); - tuple = change->data.tp.newtuple; + tuple = change->tp.newtuple; header = tuple->t_data; /* not a disk based tuple */ @@ -1202,9 +1202,9 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI && (i + 1) == xlrec->ntuples) - change->data.tp.clear_toast_afterwards = true; + change->tp.clear_toast_afterwards = true; else - change->data.tp.clear_toast_afterwards = false; + change->tp.clear_toast_afterwards = false; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); @@ -1241,9 +1241,9 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM; change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator)); + memcpy(&change->tp.rlocator, &target_locator, sizeof(RelFileLocator)); - change->data.tp.clear_toast_afterwards = true; + change->tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a5e165fb123..afde2c5c208 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -533,44 +533,44 @@ ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: - if (change->data.tp.newtuple) + if (change->tp.newtuple) { - ReorderBufferFreeTupleBuf(change->data.tp.newtuple); - change->data.tp.newtuple = NULL; + ReorderBufferFreeTupleBuf(change->tp.newtuple); + change->tp.newtuple = NULL; } - if (change->data.tp.oldtuple) + if (change->tp.oldtuple) { - ReorderBufferFreeTupleBuf(change->data.tp.oldtuple); - change->data.tp.oldtuple = NULL; + ReorderBufferFreeTupleBuf(change->tp.oldtuple); + change->tp.oldtuple = NULL; } break; case REORDER_BUFFER_CHANGE_MESSAGE: - if (change->data.msg.prefix != NULL) - pfree(change->data.msg.prefix); - change->data.msg.prefix = NULL; - if (change->data.msg.message != NULL) - pfree(change->data.msg.message); - change->data.msg.message = NULL; + if (change->msg.prefix != NULL) + pfree(change->msg.prefix); + change->msg.prefix = NULL; + if (change->msg.message != NULL) + pfree(change->msg.message); + change->msg.message = NULL; break; case REORDER_BUFFER_CHANGE_INVALIDATION: - if (change->data.inval.invalidations) - pfree(change->data.inval.invalidations); - change->data.inval.invalidations = NULL; + if (change->inval.invalidations) + pfree(change->inval.invalidations); + change->inval.invalidations = NULL; break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: - if (change->data.snapshot) + if (change->snapshot) { - ReorderBufferFreeSnap(rb, change->data.snapshot); - change->data.snapshot = NULL; + ReorderBufferFreeSnap(rb, change->snapshot); + change->snapshot = NULL; } break; /* no data in addition to the struct itself */ case REORDER_BUFFER_CHANGE_TRUNCATE: - if (change->data.truncate.relids != NULL) + if (change->truncate.relids != NULL) { - ReorderBufferFreeRelids(rb, change->data.truncate.relids); - change->data.truncate.relids = NULL; + ReorderBufferFreeRelids(rb, change->truncate.relids); + change->truncate.relids = NULL; } break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -771,7 +771,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE; else if (rbtxn_has_partial_change(toptxn) && IsInsertOrUpdate(change->action) && - change->data.tp.clear_toast_afterwards) + change->tp.clear_toast_afterwards) toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE; /* @@ -892,10 +892,10 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, change = ReorderBufferAllocChange(rb); change->action = REORDER_BUFFER_CHANGE_MESSAGE; - change->data.msg.prefix = pstrdup(prefix); - change->data.msg.message_size = message_size; - change->data.msg.message = palloc(message_size); - memcpy(change->data.msg.message, message, message_size); + change->msg.prefix = pstrdup(prefix); + change->msg.message_size = message_size; + change->msg.message = palloc(message_size); + memcpy(change->msg.message, message, message_size); ReorderBufferQueueChange(rb, xid, lsn, change, false); @@ -1866,18 +1866,18 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) /* be careful about padding */ memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); - key.rlocator = change->data.tuplecid.locator; + key.rlocator = change->tuplecid.locator; - ItemPointerCopy(&change->data.tuplecid.tid, + ItemPointerCopy(&change->tuplecid.tid, &key.tid); ent = (ReorderBufferTupleCidEnt *) hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found); if (!found) { - ent->cmin = change->data.tuplecid.cmin; - ent->cmax = change->data.tuplecid.cmax; - ent->combocid = change->data.tuplecid.combocid; + ent->cmin = change->tuplecid.cmin; + ent->cmax = change->tuplecid.cmax; + ent->combocid = change->tuplecid.combocid; } else { @@ -1885,16 +1885,16 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) * Maybe we already saw this tuple before in this transaction, but * if so it must have the same cmin. */ - Assert(ent->cmin == change->data.tuplecid.cmin); + Assert(ent->cmin == change->tuplecid.cmin); /* * cmax may be initially invalid, but once set it can only grow, * and never become invalid again. */ Assert((ent->cmax == InvalidCommandId) || - ((change->data.tuplecid.cmax != InvalidCommandId) && - (change->data.tuplecid.cmax > ent->cmax))); - ent->cmax = change->data.tuplecid.cmax; + ((change->tuplecid.cmax != InvalidCommandId) && + (change->tuplecid.cmax > ent->cmax))); + ent->cmax = change->tuplecid.cmax; } } } @@ -2101,14 +2101,14 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, { if (streaming) rb->stream_message(rb, txn, change->lsn, true, - change->data.msg.prefix, - change->data.msg.message_size, - change->data.msg.message); + change->msg.prefix, + change->msg.message_size, + change->msg.message); else rb->message(rb, txn, change->lsn, true, - change->data.msg.prefix, - change->data.msg.message_size, - change->data.msg.message); + change->msg.prefix, + change->msg.message_size, + change->msg.message); } /* @@ -2316,7 +2316,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (specinsert == NULL) elog(ERROR, "invalid ordering of speculative insertion changes"); - Assert(specinsert->data.tp.oldtuple == NULL); + Assert(specinsert->tp.oldtuple == NULL); change = specinsert; change->action = REORDER_BUFFER_CHANGE_INSERT; @@ -2326,8 +2326,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: Assert(snapshot_now); - reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid, - change->data.tp.rlocator.relNumber); + reloid = RelidByRelfilenumber(change->tp.rlocator.spcOid, + change->tp.rlocator.relNumber); /* * Mapped catalog tuple without data, emitted while @@ -2342,12 +2342,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * mapping the relfilenumber to the oid. */ if (reloid == InvalidOid && - change->data.tp.newtuple == NULL && - change->data.tp.oldtuple == NULL) + change->tp.newtuple == NULL && + change->tp.oldtuple == NULL) goto change_done; else if (reloid == InvalidOid) elog(ERROR, "could not map filenumber \"%s\" to relation OID", - relpathperm(change->data.tp.rlocator, + relpathperm(change->tp.rlocator, MAIN_FORKNUM).str); relation = RelationIdGetRelation(reloid); @@ -2355,7 +2355,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, if (!RelationIsValid(relation)) elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")", reloid, - relpathperm(change->data.tp.rlocator, + relpathperm(change->tp.rlocator, MAIN_FORKNUM).str); if (!RelationIsLogicallyLogged(relation)) @@ -2389,7 +2389,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * they're not required anymore. The creator of the * tuple tells us. */ - if (change->data.tp.clear_toast_afterwards) + if (change->tp.clear_toast_afterwards) ReorderBufferToastReset(rb, txn); } /* we're not interested in toast deletions */ @@ -2403,7 +2403,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * freed/reused while restoring spooled data from * disk. */ - Assert(change->data.tp.newtuple != NULL); + Assert(change->tp.newtuple != NULL); dlist_delete(&change->node); ReorderBufferToastAppendChunk(rb, txn, relation, @@ -2473,7 +2473,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * completely new tuple to avoid confusion about the * previous tuple's toast chunks. */ - Assert(change->data.tp.clear_toast_afterwards); + Assert(change->tp.clear_toast_afterwards); ReorderBufferToastReset(rb, txn); /* We don't need this record anymore. */ @@ -2485,14 +2485,14 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_TRUNCATE: { int i; - int nrelids = change->data.truncate.nrelids; + int nrelids = change->truncate.nrelids; int nrelations = 0; Relation *relations; relations = palloc0(nrelids * sizeof(Relation)); for (i = 0; i < nrelids; i++) { - Oid relid = change->data.truncate.relids[i]; + Oid relid = change->truncate.relids[i]; Relation rel; rel = RelationIdGetRelation(relid); @@ -2523,8 +2523,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INVALIDATION: /* Execute the invalidation messages locally */ - ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations, - change->data.inval.invalidations); + ReorderBufferExecuteInvalidations(change->inval.ninvalidations, + change->inval.invalidations); break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: @@ -2535,7 +2535,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, { ReorderBufferFreeSnap(rb, snapshot_now); snapshot_now = - ReorderBufferCopySnap(rb, change->data.snapshot, + ReorderBufferCopySnap(rb, change->snapshot, txn, command_id); } @@ -2544,15 +2544,15 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * free. We could introduce refcounting for that, but for * now this seems infrequent enough not to care. */ - else if (change->data.snapshot->copied) + else if (change->snapshot->copied) { snapshot_now = - ReorderBufferCopySnap(rb, change->data.snapshot, + ReorderBufferCopySnap(rb, change->snapshot, txn, command_id); } else { - snapshot_now = change->data.snapshot; + snapshot_now = change->snapshot; } /* and continue with the new one */ @@ -2560,11 +2560,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, break; case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: - Assert(change->data.command_id != InvalidCommandId); + Assert(change->command_id != InvalidCommandId); - if (command_id < change->data.command_id) + if (command_id < change->command_id) { - command_id = change->data.command_id; + command_id = change->command_id; if (!snapshot_now->copied) { @@ -3309,7 +3309,7 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, { ReorderBufferChange *change = ReorderBufferAllocChange(rb); - change->data.snapshot = snap; + change->snapshot = snap; change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; ReorderBufferQueueChange(rb, xid, lsn, change, false); @@ -3358,7 +3358,7 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, { ReorderBufferChange *change = ReorderBufferAllocChange(rb); - change->data.command_id = cid; + change->command_id = cid; change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; ReorderBufferQueueChange(rb, xid, lsn, change, false); @@ -3462,11 +3462,11 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - change->data.tuplecid.locator = locator; - change->data.tuplecid.tid = tid; - change->data.tuplecid.cmin = cmin; - change->data.tuplecid.cmax = cmax; - change->data.tuplecid.combocid = combocid; + change->tuplecid.locator = locator; + change->tuplecid.tid = tid; + change->tuplecid.cmin = cmin; + change->tuplecid.cmax = cmax; + change->tuplecid.combocid = combocid; change->lsn = lsn; change->txn = txn; change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; @@ -3487,10 +3487,10 @@ ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid, change = ReorderBufferAllocChange(rb); change->action = REORDER_BUFFER_CHANGE_INVALIDATION; - change->data.inval.ninvalidations = nmsgs; - change->data.inval.invalidations = (SharedInvalidationMessage *) + change->inval.ninvalidations = nmsgs; + change->inval.invalidations = (SharedInvalidationMessage *) palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(change->data.inval.invalidations, msgs, + memcpy(change->inval.invalidations, msgs, sizeof(SharedInvalidationMessage) * nmsgs); ReorderBufferQueueChange(rb, xid, lsn, change, false); @@ -4095,8 +4095,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Size oldlen = 0; Size newlen = 0; - oldtup = change->data.tp.oldtuple; - newtup = change->data.tp.newtuple; + oldtup = change->tp.oldtuple; + newtup = change->tp.newtuple; if (oldtup) { @@ -4141,9 +4141,9 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_MESSAGE: { char *data; - Size prefix_size = strlen(change->data.msg.prefix) + 1; + Size prefix_size = strlen(change->msg.prefix) + 1; - sz += prefix_size + change->data.msg.message_size + + sz += prefix_size + change->msg.message_size + sizeof(Size) + sizeof(Size); ReorderBufferSerializeReserve(rb, sz); @@ -4155,16 +4155,16 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* write the prefix including the size */ memcpy(data, &prefix_size, sizeof(Size)); data += sizeof(Size); - memcpy(data, change->data.msg.prefix, + memcpy(data, change->msg.prefix, prefix_size); data += prefix_size; /* write the message including the size */ - memcpy(data, &change->data.msg.message_size, sizeof(Size)); + memcpy(data, &change->msg.message_size, sizeof(Size)); data += sizeof(Size); - memcpy(data, change->data.msg.message, - change->data.msg.message_size); - data += change->data.msg.message_size; + memcpy(data, change->msg.message, + change->msg.message_size); + data += change->msg.message_size; break; } @@ -4172,7 +4172,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, { char *data; Size inval_size = sizeof(SharedInvalidationMessage) * - change->data.inval.ninvalidations; + change->inval.ninvalidations; sz += inval_size; @@ -4181,7 +4181,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* might have been reallocated above */ ondisk = (ReorderBufferDiskChange *) rb->outbuf; - memcpy(data, change->data.inval.invalidations, inval_size); + memcpy(data, change->inval.invalidations, inval_size); data += inval_size; break; @@ -4191,7 +4191,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snap; char *data; - snap = change->data.snapshot; + snap = change->snapshot; sz += sizeof(SnapshotData) + sizeof(TransactionId) * snap->xcnt + @@ -4227,7 +4227,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data; /* account for the OIDs of truncated relations */ - size = sizeof(Oid) * change->data.truncate.nrelids; + size = sizeof(Oid) * change->truncate.nrelids; sz += size; /* make sure we have enough space */ @@ -4237,7 +4237,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* might have been reallocated above */ ondisk = (ReorderBufferDiskChange *) rb->outbuf; - memcpy(data, change->data.truncate.relids, size); + memcpy(data, change->truncate.relids, size); data += size; break; @@ -4454,8 +4454,8 @@ ReorderBufferChangeSize(ReorderBufferChange *change) Size oldlen = 0; Size newlen = 0; - oldtup = change->data.tp.oldtuple; - newtup = change->data.tp.newtuple; + oldtup = change->tp.oldtuple; + newtup = change->tp.newtuple; if (oldtup) { @@ -4475,9 +4475,9 @@ ReorderBufferChangeSize(ReorderBufferChange *change) } case REORDER_BUFFER_CHANGE_MESSAGE: { - Size prefix_size = strlen(change->data.msg.prefix) + 1; + Size prefix_size = strlen(change->msg.prefix) + 1; - sz += prefix_size + change->data.msg.message_size + + sz += prefix_size + change->msg.message_size + sizeof(Size) + sizeof(Size); break; @@ -4485,14 +4485,14 @@ ReorderBufferChangeSize(ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INVALIDATION: { sz += sizeof(SharedInvalidationMessage) * - change->data.inval.ninvalidations; + change->inval.ninvalidations; break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; - snap = change->data.snapshot; + snap = change->snapshot; sz += sizeof(SnapshotData) + sizeof(TransactionId) * snap->xcnt + @@ -4502,7 +4502,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change) } case REORDER_BUFFER_CHANGE_TRUNCATE: { - sz += sizeof(Oid) * change->data.truncate.nrelids; + sz += sizeof(Oid) * change->truncate.nrelids; break; } @@ -4688,28 +4688,28 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: - if (change->data.tp.oldtuple) + if (change->tp.oldtuple) { uint32 tuplelen = ((HeapTuple) data)->t_len; - change->data.tp.oldtuple = + change->tp.oldtuple = ReorderBufferAllocTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); /* restore ->tuple */ - memcpy(change->data.tp.oldtuple, data, + memcpy(change->tp.oldtuple, data, sizeof(HeapTupleData)); data += sizeof(HeapTupleData); /* reset t_data pointer into the new tuplebuf */ - change->data.tp.oldtuple->t_data = - (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE); + change->tp.oldtuple->t_data = + (HeapTupleHeader) ((char *) change->tp.oldtuple + HEAPTUPLESIZE); /* restore tuple data itself */ - memcpy(change->data.tp.oldtuple->t_data, data, tuplelen); + memcpy(change->tp.oldtuple->t_data, data, tuplelen); data += tuplelen; } - if (change->data.tp.newtuple) + if (change->tp.newtuple) { /* here, data might not be suitably aligned! */ uint32 tuplelen; @@ -4717,20 +4717,20 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len), sizeof(uint32)); - change->data.tp.newtuple = + change->tp.newtuple = ReorderBufferAllocTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); /* restore ->tuple */ - memcpy(change->data.tp.newtuple, data, + memcpy(change->tp.newtuple, data, sizeof(HeapTupleData)); data += sizeof(HeapTupleData); /* reset t_data pointer into the new tuplebuf */ - change->data.tp.newtuple->t_data = - (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE); + change->tp.newtuple->t_data = + (HeapTupleHeader) ((char *) change->tp.newtuple + HEAPTUPLESIZE); /* restore tuple data itself */ - memcpy(change->data.tp.newtuple->t_data, data, tuplelen); + memcpy(change->tp.newtuple->t_data, data, tuplelen); data += tuplelen; } @@ -4742,33 +4742,33 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* read prefix */ memcpy(&prefix_size, data, sizeof(Size)); data += sizeof(Size); - change->data.msg.prefix = MemoryContextAlloc(rb->context, - prefix_size); - memcpy(change->data.msg.prefix, data, prefix_size); - Assert(change->data.msg.prefix[prefix_size - 1] == '\0'); + change->msg.prefix = MemoryContextAlloc(rb->context, + prefix_size); + memcpy(change->msg.prefix, data, prefix_size); + Assert(change->msg.prefix[prefix_size - 1] == '\0'); data += prefix_size; /* read the message */ - memcpy(&change->data.msg.message_size, data, sizeof(Size)); + memcpy(&change->msg.message_size, data, sizeof(Size)); data += sizeof(Size); - change->data.msg.message = MemoryContextAlloc(rb->context, - change->data.msg.message_size); - memcpy(change->data.msg.message, data, - change->data.msg.message_size); - data += change->data.msg.message_size; + change->msg.message = MemoryContextAlloc(rb->context, + change->msg.message_size); + memcpy(change->msg.message, data, + change->msg.message_size); + data += change->msg.message_size; break; } case REORDER_BUFFER_CHANGE_INVALIDATION: { Size inval_size = sizeof(SharedInvalidationMessage) * - change->data.inval.ninvalidations; + change->inval.ninvalidations; - change->data.inval.invalidations = + change->inval.invalidations = MemoryContextAlloc(rb->context, inval_size); /* read the message */ - memcpy(change->data.inval.invalidations, data, inval_size); + memcpy(change->inval.invalidations, data, inval_size); break; } @@ -4784,9 +4784,9 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(TransactionId) * oldsnap->xcnt + sizeof(TransactionId) * (oldsnap->subxcnt + 0); - change->data.snapshot = MemoryContextAllocZero(rb->context, size); + change->snapshot = MemoryContextAllocZero(rb->context, size); - newsnap = change->data.snapshot; + newsnap = change->snapshot; memcpy(newsnap, data, size); newsnap->xip = (TransactionId *) @@ -4800,9 +4800,9 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, { Oid *relids; - relids = ReorderBufferAllocRelids(rb, change->data.truncate.nrelids); - memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid)); - change->data.truncate.relids = relids; + relids = ReorderBufferAllocRelids(rb, change->truncate.nrelids); + memcpy(relids, data, change->truncate.nrelids * sizeof(Oid)); + change->truncate.relids = relids; break; } @@ -4991,7 +4991,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Assert(IsToastRelation(relation)); - newtup = change->data.tp.newtuple; + newtup = change->tp.newtuple; chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull)); Assert(!isnull); chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull)); @@ -5090,7 +5090,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, oldcontext = MemoryContextSwitchTo(rb->context); /* we should only have toast tuples in an INSERT or UPDATE */ - Assert(change->data.tp.newtuple); + Assert(change->tp.newtuple); desc = RelationGetDescr(relation); @@ -5106,7 +5106,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, isnull = palloc0(sizeof(bool) * desc->natts); free = palloc0(sizeof(bool) * desc->natts); - newtup = change->data.tp.newtuple; + newtup = change->tp.newtuple; heap_deform_tuple(newtup, desc, attrs, isnull); @@ -5177,7 +5177,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Pointer chunk; cchange = dlist_container(ReorderBufferChange, node, it.cur); - ctup = cchange->data.tp.newtuple; + ctup = cchange->tp.newtuple; chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull)); Assert(!cisnull); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 92eb17049c3..7cd93c21964 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1502,7 +1502,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * identity is not defined for a table. Since the DELETE action * can't be published, we simply return. */ - if (!change->data.tp.oldtuple) + if (!change->tp.oldtuple) { elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); return; @@ -1523,10 +1523,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, targetrel = ancestor; } - if (change->data.tp.oldtuple) + if (change->tp.oldtuple) { old_slot = relentry->old_slot; - ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false); + ExecStoreHeapTuple(change->tp.oldtuple, old_slot, false); /* Convert tuple if needed. */ if (relentry->attrmap) @@ -1538,10 +1538,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } } - if (change->data.tp.newtuple) + if (change->tp.newtuple) { new_slot = relentry->new_slot; - ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false); + ExecStoreHeapTuple(change->tp.newtuple, new_slot, false); /* Convert tuple if needed. */ if (relentry->attrmap) @@ -1684,8 +1684,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, xid, nrelids, relids, - change->data.truncate.cascade, - change->data.truncate.restart_seqs); + change->truncate.cascade, + change->truncate.restart_seqs); OutputPluginWrite(ctx, true); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 91dc7e5e448..bbbdf098c71 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -154,7 +154,7 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; - } data; + }; /* * While in use this is how a change is linked into a transactions, -- 2.51.0