From e0b5ad061bed9e5a6fcf889124651f6c9fc329cd Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Mon, 16 Dec 2019 14:30:42 +0100 Subject: [PATCH v2] Have logical replication subscriber fire column triggers The logical replication apply worker did not fire per-column update triggers because the updatedCols bitmap in the RTE was not populated. This fixes that. --- src/backend/replication/logical/worker.c | 16 +++++++++++ src/test/subscription/t/003_constraints.pl | 32 +++++++++++++++++++--- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ced0d191c2..99b2be0835 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -690,6 +690,7 @@ apply_handle_update(StringInfo s) bool has_oldtup; TupleTableSlot *localslot; TupleTableSlot *remoteslot; + RangeTblEntry *target_rte; bool found; MemoryContext oldctx; @@ -720,6 +721,21 @@ apply_handle_update(StringInfo s) &estate->es_tupleTable); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + /* + * Populate updatedCols so that per-column triggers can fire. This could + * include more columns than were actually changed on the publisher + * because the logical replication protocol doesn't contain that + * information. But it would for example exclude columns that only exist + * on the subscriber, since we are not touching those. + */ + target_rte = list_nth(estate->es_range_table, 0); + for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) + { + if (newtup.changed[i]) + target_rte->updatedCols = bms_add_member(target_rte->updatedCols, + i + 1 - FirstLowInvalidHeapAttributeNumber); + } + PushActiveSnapshot(GetTransactionSnapshot()); ExecOpenIndices(estate->es_result_relation_info, false); diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl index 81547f65fa..34ab11e7fe 100644 --- a/src/test/subscription/t/003_constraints.pl +++ b/src/test/subscription/t/003_constraints.pl @@ -3,7 +3,7 @@ use warnings; use PostgresNode; use TestLib; -use Test::More tests => 4; +use Test::More tests => 6; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -81,6 +81,8 @@ BEGIN ELSE RETURN NULL; END IF; + ELSIF (TG_OP = 'UPDATE') THEN + RETURN NULL; ELSE RAISE WARNING 'Unknown action'; RETURN NULL; @@ -88,7 +90,7 @@ BEGIN END; \$\$ LANGUAGE plpgsql; CREATE TRIGGER filter_basic_dml_trg - BEFORE INSERT ON tab_fk_ref + BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn(); ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg; }); @@ -99,10 +101,32 @@ BEGIN $node_publisher->wait_for_catchup('tap_sub'); -# The row should be skipped on subscriber +# The trigger should cause the insert to be skipped on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber'); + +# Update data +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# The trigger should cause the update to be skipped on subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); -is($result, qq(2|1|2), 'check replica trigger applied on subscriber'); +is($result, qq(2|1|2), 'check replica update column trigger applied on subscriber'); + +# Update on a column not specified in the trigger, but it will trigger +# anyway because logical replication ships all columns in an update. +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(id), max(id) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check column trigger applied on even for other column'); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.24.1