From f4360963fdc2d9ff76cb89b4324f94e04bd79724 Mon Sep 17 00:00:00 2001 From: amit Date: Thu, 5 Dec 2019 09:17:06 +0900 Subject: [PATCH v10 2/4] Some refactoring of logical/worker.c This moves the main operations of apply_handle_{insert|update|delete}, that of inserting, updating, deleting a tuple into/from a given relation, into corresponding apply_handle_do_{insert|update|delete} functions, to perform those operations on relations that are not direct targets of replication. An example of that is when replicating changes into a partitioned table, some of which must be applied to its partitions. --- src/backend/replication/logical/worker.c | 261 ++++++++++++++++++------------- 1 file changed, 153 insertions(+), 108 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7a5471f95c..86601f6e8f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -578,6 +578,148 @@ GetRelationIdentityOrPK(Relation rel) return idxoid; } +/* Workhorse for apply_handle_insert() */ +static void +apply_handle_do_insert(ResultRelInfo *relinfo, + EState *estate, TupleTableSlot *localslot) +{ + ExecOpenIndices(relinfo, false); + + /* Do the insert. */ + ExecSimpleRelationInsert(estate, localslot); + + /* Cleanup. */ + ExecCloseIndices(relinfo); +} + +/* Workhorse for apply_handle_update() */ +static void +apply_handle_do_update(ResultRelInfo *relinfo, + EState *estate, TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *relmapentry) +{ + Relation rel = relinfo->ri_RelationDesc; + LogicalRepRelation *remoterel = &relmapentry->remoterel; + Oid idxoid; + EPQState epqstate; + TupleTableSlot *localslot; + bool found; + MemoryContext oldctx; + + localslot = table_slot_create(rel, &estate->es_tupleTable); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + + ExecOpenIndices(relinfo, false); + + /* + * Try to find tuple using either replica identity index, primary key or + * if needed, sequential scan. + */ + idxoid = GetRelationIdentityOrPK(rel); + Assert(OidIsValid(idxoid) || + (remoterel->replident == REPLICA_IDENTITY_FULL)); + + if (OidIsValid(idxoid)) + found = RelationFindReplTupleByIndex(rel, idxoid, + LockTupleExclusive, + remoteslot, localslot); + else + found = RelationFindReplTupleSeq(rel, LockTupleExclusive, + remoteslot, localslot); + + ExecClearTuple(remoteslot); + + /* + * Tuple found. + * + * Note this will fail if there are other conflicting unique indexes. + */ + if (found) + { + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_cstrings(remoteslot, localslot, relmapentry, + newtup->values, newtup->changed); + MemoryContextSwitchTo(oldctx); + + EvalPlanQualSetSlot(&epqstate, remoteslot); + + /* Do the actual update. */ + ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot); + } + else + { + /* + * The tuple to be updated could not be found. + * + * TODO what to do here, change the log level to LOG perhaps? + */ + elog(DEBUG1, + "logical replication did not find row for update " + "in replication target relation \"%s\"", + RelationGetRelationName(rel)); + } + + /* Cleanup. */ + ExecCloseIndices(relinfo); + EvalPlanQualEnd(&epqstate); +} + +/* Workhorse for apply_handle_delete() */ +static void +apply_handle_do_delete(ResultRelInfo *relinfo, EState *estate, + TupleTableSlot *remoteslot, + LogicalRepRelation *remoterel) +{ + Relation rel = relinfo->ri_RelationDesc; + Oid idxoid; + EPQState epqstate; + TupleTableSlot *localslot; + bool found; + + localslot = table_slot_create(rel, &estate->es_tupleTable); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + + /* + * Try to find tuple using either replica identity index, primary key or + * if needed, sequential scan. + */ + idxoid = GetRelationIdentityOrPK(rel); + Assert(OidIsValid(idxoid) || + (remoterel->replident == REPLICA_IDENTITY_FULL)); + + if (OidIsValid(idxoid)) + found = RelationFindReplTupleByIndex(rel, idxoid, + LockTupleExclusive, + remoteslot, localslot); + else + found = RelationFindReplTupleSeq(rel, LockTupleExclusive, + remoteslot, localslot); + ExecOpenIndices(relinfo, false); + + /* If found delete it. */ + if (found) + { + EvalPlanQualSetSlot(&epqstate, localslot); + + /* Do the actual delete. */ + ExecSimpleRelationDelete(estate, &epqstate, localslot); + } + else + { + /* The tuple to be deleted could not be found. */ + elog(DEBUG1, + "logical replication could not find row for delete " + "in replication target relation \"%s\"", + RelationGetRelationName(rel)); + } + + /* Cleanup. */ + ExecCloseIndices(relinfo); + EvalPlanQualEnd(&epqstate); +} + /* * Handle INSERT message. */ @@ -620,13 +762,10 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - ExecOpenIndices(estate->es_result_relation_info, false); - - /* Do the insert. */ - ExecSimpleRelationInsert(estate, remoteslot); + Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); + apply_handle_do_insert(estate->es_result_relation_info, estate, + remoteslot); - /* Cleanup. */ - ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); /* Handle queued AFTER triggers. */ @@ -683,16 +822,12 @@ apply_handle_update(StringInfo s) { LogicalRepRelMapEntry *rel; LogicalRepRelId relid; - Oid idxoid; EState *estate; - EPQState epqstate; LogicalRepTupleData oldtup; LogicalRepTupleData newtup; bool has_oldtup; - TupleTableSlot *localslot; TupleTableSlot *remoteslot; RangeTblEntry *target_rte; - bool found; MemoryContext oldctx; ensure_transaction(); @@ -718,9 +853,6 @@ apply_handle_update(StringInfo s) remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); - localslot = table_slot_create(rel->localrel, - &estate->es_tupleTable); - EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); /* * Populate updatedCols so that per-column triggers can fire. This could @@ -738,7 +870,6 @@ apply_handle_update(StringInfo s) } PushActiveSnapshot(GetTransactionSnapshot()); - ExecOpenIndices(estate->es_result_relation_info, false); /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); @@ -746,63 +877,15 @@ apply_handle_update(StringInfo s) has_oldtup ? oldtup.values : newtup.values); MemoryContextSwitchTo(oldctx); - /* - * Try to find tuple using either replica identity index, primary key or - * if needed, sequential scan. - */ - idxoid = GetRelationIdentityOrPK(rel->localrel); - Assert(OidIsValid(idxoid) || - (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup)); - - if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(rel->localrel, idxoid, - LockTupleExclusive, - remoteslot, localslot); - else - found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, - remoteslot, localslot); - - ExecClearTuple(remoteslot); - - /* - * Tuple found. - * - * Note this will fail if there are other conflicting unique indexes. - */ - if (found) - { - /* Process and store remote tuple in the slot */ - oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_cstrings(remoteslot, localslot, rel, - newtup.values, newtup.changed); - MemoryContextSwitchTo(oldctx); - - EvalPlanQualSetSlot(&epqstate, remoteslot); - - /* Do the actual update. */ - ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot); - } - else - { - /* - * The tuple to be updated could not be found. - * - * TODO what to do here, change the log level to LOG perhaps? - */ - elog(DEBUG1, - "logical replication did not find row for update " - "in replication target relation \"%s\"", - RelationGetRelationName(rel->localrel)); - } + Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); + apply_handle_do_update(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel); - /* Cleanup. */ - ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); /* Handle queued AFTER triggers. */ AfterTriggerEndQuery(estate); - EvalPlanQualEnd(&epqstate); ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); @@ -822,12 +905,8 @@ apply_handle_delete(StringInfo s) LogicalRepRelMapEntry *rel; LogicalRepTupleData oldtup; LogicalRepRelId relid; - Oid idxoid; EState *estate; - EPQState epqstate; TupleTableSlot *remoteslot; - TupleTableSlot *localslot; - bool found; MemoryContext oldctx; ensure_transaction(); @@ -852,58 +931,24 @@ apply_handle_delete(StringInfo s) remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); - localslot = table_slot_create(rel->localrel, - &estate->es_tupleTable); - EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + /* Input functions may need an active snapshot, so get one */ PushActiveSnapshot(GetTransactionSnapshot()); - ExecOpenIndices(estate->es_result_relation_info, false); - /* Find the tuple using the replica identity index. */ + /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, oldtup.values); MemoryContextSwitchTo(oldctx); - /* - * Try to find tuple using either replica identity index, primary key or - * if needed, sequential scan. - */ - idxoid = GetRelationIdentityOrPK(rel->localrel); - Assert(OidIsValid(idxoid) || - (rel->remoterel.replident == REPLICA_IDENTITY_FULL)); - - if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(rel->localrel, idxoid, - LockTupleExclusive, - remoteslot, localslot); - else - found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, - remoteslot, localslot); - /* If found delete it. */ - if (found) - { - EvalPlanQualSetSlot(&epqstate, localslot); - - /* Do the actual delete. */ - ExecSimpleRelationDelete(estate, &epqstate, localslot); - } - else - { - /* The tuple to be deleted could not be found. */ - elog(DEBUG1, - "logical replication could not find row for delete " - "in replication target relation \"%s\"", - RelationGetRelationName(rel->localrel)); - } + Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); + apply_handle_do_delete(estate->es_result_relation_info, estate, + remoteslot, &rel->remoterel); - /* Cleanup. */ - ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); /* Handle queued AFTER triggers. */ AfterTriggerEndQuery(estate); - EvalPlanQualEnd(&epqstate); ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); -- 2.16.5