From 8126a6bb784506180ba1d9c4985aabe124ffc63e Mon Sep 17 00:00:00 2001 From: amit Date: Thu, 5 Dec 2019 09:17:06 +0900 Subject: [PATCH v8 3/4] Some refactoring of logical/worker.c --- src/backend/replication/logical/worker.c | 291 ++++++++++++++++++------------- 1 file changed, 170 insertions(+), 121 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7a5471f95c..34b0ac78cc 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -90,7 +90,8 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); typedef struct SlotErrCallbackArg { - LogicalRepRelMapEntry *rel; + LogicalRepRelation *remoterel; + Oid local_reloid; int local_attnum; int remote_attnum; } SlotErrCallbackArg; @@ -268,7 +269,6 @@ static void slot_store_error_callback(void *arg) { SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; - LogicalRepRelMapEntry *rel; char *remotetypname; Oid remotetypoid, localtypoid; @@ -277,19 +277,18 @@ slot_store_error_callback(void *arg) if (errarg->remote_attnum < 0) return; - rel = errarg->rel; - remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum]; + remotetypoid = errarg->remoterel->atttyps[errarg->remote_attnum]; /* Fetch remote type name from the LogicalRepTypMap cache */ remotetypname = logicalrep_typmap_gettypname(remotetypoid); /* Fetch local type OID from the local sys cache */ - localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1); + localtypoid = get_atttype(errarg->local_reloid, errarg->local_attnum + 1); errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", " "remote type %s, local type %s", - rel->remoterel.nspname, rel->remoterel.relname, - rel->remoterel.attnames[errarg->remote_attnum], + errarg->remoterel->nspname, errarg->remoterel->relname, + errarg->remoterel->attnames[errarg->remote_attnum], remotetypname, format_type_be(localtypoid)); } @@ -311,7 +310,8 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, ExecClearTuple(slot); /* Push callback + info on the error context stack */ - errarg.rel = rel; + errarg.remoterel = &rel->remoterel; + errarg.local_reloid = rel->localreloid; errarg.local_attnum = -1; errarg.remote_attnum = -1; errcallback.callback = slot_store_error_callback; @@ -375,8 +375,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, */ static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, - LogicalRepRelMapEntry *rel, - char **values, bool *replaces) + char **values, bool *replaces, + AttrMap *attrmap, LogicalRepRelation *remoterel, + Oid local_reloid) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -396,7 +397,8 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool)); /* For error reporting, push callback + info on the error context stack */ - errarg.rel = rel; + errarg.remoterel = remoterel; + errarg.local_reloid = local_reloid; errarg.local_attnum = -1; errarg.remote_attnum = -1; errcallback.callback = slot_store_error_callback; @@ -405,11 +407,11 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, error_context_stack = &errcallback; /* Call the "in" function for each replaced attribute */ - Assert(natts == rel->attrmap->maplen); + Assert(natts == attrmap->maplen); for (i = 0; i < natts; i++) { Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); - int remoteattnum = rel->attrmap->attnums[i]; + int remoteattnum = attrmap->attnums[i]; if (remoteattnum < 0) continue; @@ -578,6 +580,148 @@ GetRelationIdentityOrPK(Relation rel) return idxoid; } +/* Workhorse for apply_handle_insert() */ +static void +apply_handle_do_insert(ResultRelInfo *relinfo, + EState *estate, TupleTableSlot *localslot) +{ + ExecOpenIndices(relinfo, false); + + /* Do the insert. */ + ExecSimpleRelationInsert(estate, localslot); + + /* Cleanup. */ + ExecCloseIndices(relinfo); +} + +/* Workhorse for apply_handle_update() */ +static void +apply_handle_do_update(ResultRelInfo *relinfo, + EState *estate, TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + AttrMap *attrmap, LogicalRepRelation *remoterel) +{ + Relation rel = relinfo->ri_RelationDesc; + Oid idxoid; + EPQState epqstate; + TupleTableSlot *localslot; + bool found; + MemoryContext oldctx; + + localslot = table_slot_create(rel, &estate->es_tupleTable); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + + ExecOpenIndices(relinfo, false); + + /* + * Try to find tuple using either replica identity index, primary key or + * if needed, sequential scan. + */ + idxoid = GetRelationIdentityOrPK(rel); + Assert(OidIsValid(idxoid) || + (remoterel->replident == REPLICA_IDENTITY_FULL)); + + if (OidIsValid(idxoid)) + found = RelationFindReplTupleByIndex(rel, idxoid, + LockTupleExclusive, + remoteslot, localslot); + else + found = RelationFindReplTupleSeq(rel, LockTupleExclusive, + remoteslot, localslot); + + ExecClearTuple(remoteslot); + + /* + * Tuple found. + * + * Note this will fail if there are other conflicting unique indexes. + */ + if (found) + { + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_cstrings(remoteslot, localslot, + newtup->values, newtup->changed, + attrmap, remoterel, RelationGetRelid(rel)); + MemoryContextSwitchTo(oldctx); + + EvalPlanQualSetSlot(&epqstate, remoteslot); + + /* Do the actual update. */ + ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot); + } + else + { + /* + * The tuple to be updated could not be found. + * + * TODO what to do here, change the log level to LOG perhaps? + */ + elog(DEBUG1, + "logical replication did not find row for update " + "in replication target relation \"%s\"", + RelationGetRelationName(rel)); + } + + /* Cleanup. */ + ExecCloseIndices(relinfo); + EvalPlanQualEnd(&epqstate); +} + +/* Workhorse for apply_handle_delete() */ +static void +apply_handle_do_delete(ResultRelInfo *relinfo, EState *estate, + TupleTableSlot *remoteslot, + LogicalRepRelation *remoterel) +{ + Relation rel = relinfo->ri_RelationDesc; + Oid idxoid; + EPQState epqstate; + TupleTableSlot *localslot; + bool found; + + localslot = table_slot_create(rel, &estate->es_tupleTable); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + + /* + * Try to find tuple using either replica identity index, primary key or + * if needed, sequential scan. + */ + idxoid = GetRelationIdentityOrPK(rel); + Assert(OidIsValid(idxoid) || + (remoterel->replident == REPLICA_IDENTITY_FULL)); + + if (OidIsValid(idxoid)) + found = RelationFindReplTupleByIndex(rel, idxoid, + LockTupleExclusive, + remoteslot, localslot); + else + found = RelationFindReplTupleSeq(rel, LockTupleExclusive, + remoteslot, localslot); + ExecOpenIndices(relinfo, false); + + /* If found delete it. */ + if (found) + { + EvalPlanQualSetSlot(&epqstate, localslot); + + /* Do the actual delete. */ + ExecSimpleRelationDelete(estate, &epqstate, localslot); + } + else + { + /* The tuple to be deleted could not be found. */ + elog(DEBUG1, + "logical replication could not find row for delete " + "in replication target relation \"%s\"", + RelationGetRelationName(rel)); + } + + /* Cleanup. */ + ExecCloseIndices(relinfo); + EvalPlanQualEnd(&epqstate); +} + /* * Handle INSERT message. */ @@ -620,13 +764,10 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - ExecOpenIndices(estate->es_result_relation_info, false); - - /* Do the insert. */ - ExecSimpleRelationInsert(estate, remoteslot); + Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); + apply_handle_do_insert(estate->es_result_relation_info, estate, + remoteslot); - /* Cleanup. */ - ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); /* Handle queued AFTER triggers. */ @@ -683,16 +824,12 @@ apply_handle_update(StringInfo s) { LogicalRepRelMapEntry *rel; LogicalRepRelId relid; - Oid idxoid; EState *estate; - EPQState epqstate; LogicalRepTupleData oldtup; LogicalRepTupleData newtup; bool has_oldtup; - TupleTableSlot *localslot; TupleTableSlot *remoteslot; RangeTblEntry *target_rte; - bool found; MemoryContext oldctx; ensure_transaction(); @@ -718,9 +855,6 @@ apply_handle_update(StringInfo s) remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); - localslot = table_slot_create(rel->localrel, - &estate->es_tupleTable); - EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); /* * Populate updatedCols so that per-column triggers can fire. This could @@ -738,7 +872,6 @@ apply_handle_update(StringInfo s) } PushActiveSnapshot(GetTransactionSnapshot()); - ExecOpenIndices(estate->es_result_relation_info, false); /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); @@ -746,63 +879,16 @@ apply_handle_update(StringInfo s) has_oldtup ? oldtup.values : newtup.values); MemoryContextSwitchTo(oldctx); - /* - * Try to find tuple using either replica identity index, primary key or - * if needed, sequential scan. - */ - idxoid = GetRelationIdentityOrPK(rel->localrel); - Assert(OidIsValid(idxoid) || - (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup)); - - if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(rel->localrel, idxoid, - LockTupleExclusive, - remoteslot, localslot); - else - found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, - remoteslot, localslot); - - ExecClearTuple(remoteslot); - - /* - * Tuple found. - * - * Note this will fail if there are other conflicting unique indexes. - */ - if (found) - { - /* Process and store remote tuple in the slot */ - oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_cstrings(remoteslot, localslot, rel, - newtup.values, newtup.changed); - MemoryContextSwitchTo(oldctx); - - EvalPlanQualSetSlot(&epqstate, remoteslot); - - /* Do the actual update. */ - ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot); - } - else - { - /* - * The tuple to be updated could not be found. - * - * TODO what to do here, change the log level to LOG perhaps? - */ - elog(DEBUG1, - "logical replication did not find row for update " - "in replication target relation \"%s\"", - RelationGetRelationName(rel->localrel)); - } + Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); + apply_handle_do_update(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel->attrmap, + &rel->remoterel); - /* Cleanup. */ - ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); /* Handle queued AFTER triggers. */ AfterTriggerEndQuery(estate); - EvalPlanQualEnd(&epqstate); ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); @@ -822,12 +908,8 @@ apply_handle_delete(StringInfo s) LogicalRepRelMapEntry *rel; LogicalRepTupleData oldtup; LogicalRepRelId relid; - Oid idxoid; EState *estate; - EPQState epqstate; TupleTableSlot *remoteslot; - TupleTableSlot *localslot; - bool found; MemoryContext oldctx; ensure_transaction(); @@ -852,58 +934,25 @@ apply_handle_delete(StringInfo s) remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); - localslot = table_slot_create(rel->localrel, - &estate->es_tupleTable); - EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + /* Input functions may need an active snapshot, so get one */ PushActiveSnapshot(GetTransactionSnapshot()); - ExecOpenIndices(estate->es_result_relation_info, false); - /* Find the tuple using the replica identity index. */ + /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, oldtup.values); + slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - /* - * Try to find tuple using either replica identity index, primary key or - * if needed, sequential scan. - */ - idxoid = GetRelationIdentityOrPK(rel->localrel); - Assert(OidIsValid(idxoid) || - (rel->remoterel.replident == REPLICA_IDENTITY_FULL)); - - if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(rel->localrel, idxoid, - LockTupleExclusive, - remoteslot, localslot); - else - found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, - remoteslot, localslot); - /* If found delete it. */ - if (found) - { - EvalPlanQualSetSlot(&epqstate, localslot); - - /* Do the actual delete. */ - ExecSimpleRelationDelete(estate, &epqstate, localslot); - } - else - { - /* The tuple to be deleted could not be found. */ - elog(DEBUG1, - "logical replication could not find row for delete " - "in replication target relation \"%s\"", - RelationGetRelationName(rel->localrel)); - } + Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); + apply_handle_do_delete(estate->es_result_relation_info, estate, + remoteslot, &rel->remoterel); - /* Cleanup. */ - ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); /* Handle queued AFTER triggers. */ AfterTriggerEndQuery(estate); - EvalPlanQualEnd(&epqstate); ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); -- 2.16.5