From 53e41c5bdd89dbaf5022f59169d0de483f82babc Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu Date: Sun, 30 Nov 2025 16:48:55 +0100 Subject: [PATCH v15 1/2] Modify the ExecInitPartitionInfo function to consider partitioned indexes that are potentially processed by REINDEX CONCURRENTLY as arbiters as well. This is necessary to ensure that all concurrent transactions use the same set of arbiter indexes. --- src/backend/executor/execPartition.c | 127 +++++++++- src/test/modules/injection_points/Makefile | 3 +- ...eindex-concurrently-upsert-partitioned.out | 232 ++++++++++++++++++ src/test/modules/injection_points/meson.build | 1 + ...index-concurrently-upsert-partitioned.spec | 96 ++++++++ 5 files changed, 446 insertions(+), 13 deletions(-) create mode 100644 src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out create mode 100644 src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 0dcce181f09..bd3527393ec 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -15,6 +15,7 @@ #include "access/table.h" #include "access/tableam.h" +#include "catalog/index.h" #include "catalog/partition.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -490,6 +491,62 @@ ExecFindPartition(ModifyTableState *mtstate, return rri; } +/* + * IsIndexCompatibleAsArbiter + * Checks if the indexes are identical in terms of being used + * as arbiters for the INSERT ON CONFLICT operation by comparing + * them to the provided arbiter index. + * + * Only indexes of the same relation are supported. + * + * Returns the true if indexes are compatible. + */ +static bool +IsIndexCompatibleAsArbiter(Relation arbiterIndexRelation, + IndexInfo *arbiterIndexInfo, + Relation indexRelation, + IndexInfo *indexInfo) +{ + int i; + + Assert(arbiterIndexRelation->rd_index->indrelid == indexRelation->rd_index->indrelid); + + if (arbiterIndexInfo->ii_Unique != indexInfo->ii_Unique) + return false; + + if (arbiterIndexInfo->ii_NullsNotDistinct != indexInfo->ii_NullsNotDistinct) + return false; + + /* and same number of key attributes */ + if (arbiterIndexInfo->ii_NumIndexKeyAttrs != indexInfo->ii_NumIndexKeyAttrs) + return false; + + /* No support currently for comparing exclusion indexes. */ + if (arbiterIndexInfo->ii_ExclusionOps != NULL || indexInfo->ii_ExclusionOps != NULL) + return false; + + for (i = 0; i < arbiterIndexInfo->ii_NumIndexKeyAttrs; i++) + { + if (arbiterIndexRelation->rd_indcollation[i] != indexRelation->rd_indcollation[i]) + return false; + + if (arbiterIndexRelation->rd_opfamily[i] != indexRelation->rd_opfamily[i]) + return false; + + if (arbiterIndexRelation->rd_index->indkey.values[i] != indexRelation->rd_index->indkey.values[i]) + return false; + } + + if (list_difference(RelationGetIndexExpressions(arbiterIndexRelation), + RelationGetIndexExpressions(indexRelation)) != NIL) + return false; + + if (list_difference(RelationGetIndexPredicate(arbiterIndexRelation), + RelationGetIndexPredicate(indexRelation)) != NIL) + return false; + return true; +} + /* * ExecInitPartitionInfo * Lock the partition and initialize ResultRelInfo. Also setup other @@ -690,7 +747,9 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate, TupleDesc partrelDesc = RelationGetDescr(partrel); ExprContext *econtext = mtstate->ps.ps_ExprContext; ListCell *lc; - List *arbiterIndexes = NIL; + List *arbiterIndexes = NIL, + *arbiterIndexesOffset = NIL; + int additional_arbiters = 0; /* * If there is a list of arbiter indexes, map it to a list of indexes @@ -698,36 +757,80 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate, * list and searching for ancestry relationships to each index in the * ancestor table. */ - if (rootResultRelInfo->ri_onConflictArbiterIndexes != NIL) - { - List *childIdxs; + if (rootResultRelInfo->ri_onConflictArbiterIndexes != NIL) { + List *childIdxs, + *nonAncestorIdxOffset = NIL; childIdxs = RelationGetIndexList(leaf_part_rri->ri_RelationDesc); foreach(lc, childIdxs) { Oid childIdx = lfirst_oid(lc); + int i = foreach_current_index(lc); List *ancestors; - ListCell *lc2; ancestors = get_partition_ancestors(childIdx); - foreach(lc2, rootResultRelInfo->ri_onConflictArbiterIndexes) + if (ancestors) { - if (list_member_oid(ancestors, lfirst_oid(lc2))) - arbiterIndexes = lappend_oid(arbiterIndexes, childIdx); + foreach_oid(oid, rootResultRelInfo->ri_onConflictArbiterIndexes) + { + if (list_member_oid(ancestors, oid)) + { + arbiterIndexes = lappend_oid(arbiterIndexes, childIdx); + arbiterIndexesOffset = lappend_int(arbiterIndexesOffset, i); + } + } } + else /* No ancestor was found for that index. Save it for rechecking later. */ + nonAncestorIdxOffset = lappend_int(nonAncestorIdxOffset, i); + list_free(ancestors); } + + /* + * If any non-ancestor indexes are found, we need to compare them with other + * indexes of the relation that will be used as arbiters. This is necessary + * when a partitioned index is processed by REINDEX CONCURRENTLY. Both indexes + * must be considered as arbiters to ensure that all concurrent transactions + * use the same set of arbiters. + */ + if (nonAncestorIdxOffset && arbiterIndexesOffset) + { + foreach_int(childIdxOffset, nonAncestorIdxOffset) + { + Relation nonAncestorIndexRelation = leaf_part_rri->ri_IndexRelationDescs[childIdxOffset]; + IndexInfo *nonAncestorIndexInfo = leaf_part_rri->ri_IndexRelationInfo[childIdxOffset]; + Assert(!list_member_oid(arbiterIndexes, nonAncestorIndexRelation->rd_index->indexrelid)); + + /* It is too early to use non-ready indexes as arbiters */ + if (!nonAncestorIndexInfo->ii_ReadyForInserts) + continue; + + foreach_int(arbiterIdxOffset, arbiterIndexesOffset) + { + Relation arbiterIndexRelation = leaf_part_rri->ri_IndexRelationDescs[arbiterIdxOffset]; + IndexInfo *arbiterIndexInfo = leaf_part_rri->ri_IndexRelationInfo[arbiterIdxOffset]; + + /* If non-ancestor index are compatible to arbiter - use it as arbiter too. */ + if (IsIndexCompatibleAsArbiter(arbiterIndexRelation, arbiterIndexInfo, + nonAncestorIndexRelation, nonAncestorIndexInfo)) + { + arbiterIndexes = lappend_oid(arbiterIndexes, + nonAncestorIndexRelation->rd_index->indexrelid); + additional_arbiters++; + } + } + } + } + list_free(nonAncestorIdxOffset); } /* * If the resulting lists are of inequal length, something is wrong. - * XXX This may happen because we don't match the lists correctly when - * a partitioned index is being processed by REINDEX CONCURRENTLY. - * FIXME later. + * But we need to consider additional arbiter indexes also. */ if (list_length(rootResultRelInfo->ri_onConflictArbiterIndexes) != - list_length(arbiterIndexes)) + list_length(arbiterIndexes) - additional_arbiters) elog(ERROR, "invalid arbiter index list"); leaf_part_rri->ri_onConflictArbiterIndexes = arbiterIndexes; diff --git a/src/test/modules/injection_points/Makefile b/src/test/modules/injection_points/Makefile index 7b3c0c4b716..e10475d2665 100644 --- a/src/test/modules/injection_points/Makefile +++ b/src/test/modules/injection_points/Makefile @@ -19,7 +19,8 @@ ISOLATION = basic \ syscache-update-pruned \ index-concurrently-upsert \ reindex-concurrently-upsert \ - index-concurrently-upsert-predicate + index-concurrently-upsert-predicate \ + reindex-concurrently-upsert-partitioned TAP_TESTS = 1 diff --git a/src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out b/src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out new file mode 100644 index 00000000000..588ffbd57a5 --- /dev/null +++ b/src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out @@ -0,0 +1,232 @@ +Parsed test spec with 4 sessions + +starting permutation: s3_setup_wait_before_set_dead s3_start_reindex s1_start_upsert s4_wakeup_to_set_dead s2_start_upsert s4_wakeup_s1 s4_wakeup_s2 +injection_points_attach +----------------------- + +(1 row) + +injection_points_attach +----------------------- + +(1 row) + +injection_points_set_local +-------------------------- + +(1 row) + +step s3_setup_wait_before_set_dead: + SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait'); + +injection_points_attach +----------------------- + +(1 row) + +step s3_start_reindex: REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; +step s1_start_upsert: + INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); + +step s4_wakeup_to_set_dead: + SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: + INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); + +step s4_wakeup_s1: + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s1_start_upsert: <... completed> +step s4_wakeup_s2: + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: <... completed> +step s3_start_reindex: <... completed> + +starting permutation: s3_setup_wait_before_swap s3_start_reindex s1_start_upsert s4_wakeup_to_swap s2_start_upsert s4_wakeup_s2 s4_wakeup_s1 +injection_points_attach +----------------------- + +(1 row) + +injection_points_attach +----------------------- + +(1 row) + +injection_points_set_local +-------------------------- + +(1 row) + +step s3_setup_wait_before_swap: + SELECT injection_points_attach('reindex-relation-concurrently-before-swap', 'wait'); + +injection_points_attach +----------------------- + +(1 row) + +step s3_start_reindex: REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; +step s1_start_upsert: + INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); + +step s4_wakeup_to_swap: + SELECT injection_points_detach('reindex-relation-concurrently-before-swap'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-swap'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: + INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); + +step s4_wakeup_s2: + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s4_wakeup_s1: + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s1_start_upsert: <... completed> +step s2_start_upsert: <... completed> +step s3_start_reindex: <... completed> + +starting permutation: s3_setup_wait_before_set_dead s3_start_reindex s1_start_upsert s2_start_upsert s4_wakeup_s1 s4_wakeup_to_set_dead s4_wakeup_s2 +injection_points_attach +----------------------- + +(1 row) + +injection_points_attach +----------------------- + +(1 row) + +injection_points_set_local +-------------------------- + +(1 row) + +step s3_setup_wait_before_set_dead: + SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait'); + +injection_points_attach +----------------------- + +(1 row) + +step s3_start_reindex: REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; +step s1_start_upsert: + INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); + +step s2_start_upsert: + INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); + +step s4_wakeup_s1: + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s1_start_upsert: <... completed> +step s4_wakeup_to_set_dead: + SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s4_wakeup_s2: + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); + +injection_points_detach +----------------------- + +(1 row) + +injection_points_wakeup +----------------------- + +(1 row) + +step s2_start_upsert: <... completed> +step s3_start_reindex: <... completed> diff --git a/src/test/modules/injection_points/meson.build b/src/test/modules/injection_points/meson.build index 485b483e3ca..3e06220fe31 100644 --- a/src/test/modules/injection_points/meson.build +++ b/src/test/modules/injection_points/meson.build @@ -51,6 +51,7 @@ tests += { 'index-concurrently-upsert', 'reindex-concurrently-upsert', 'index-concurrently-upsert-predicate', + 'reindex-concurrently-upsert-partitioned' ], 'runningcheck': false, # see syscache-update-pruned # Some tests wait for all snapshots, so avoid parallel execution diff --git a/src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec b/src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec new file mode 100644 index 00000000000..c8a8eb9cca5 --- /dev/null +++ b/src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec @@ -0,0 +1,96 @@ +# Test race conditions involving: +# - s1: UPSERT a tuple +# - s2: UPSERT the same tuple +# - s3: REINDEX concurrent primary key index +# - s4: operations with injection points + +setup +{ + CREATE EXTENSION injection_points; + CREATE SCHEMA test; + CREATE TABLE test.tbl(i int primary key, updated_at timestamp) PARTITION BY RANGE (i); + CREATE TABLE test.tbl_partition PARTITION OF test.tbl + FOR VALUES FROM (0) TO (10000) + WITH (parallel_workers = 0); +} + +teardown +{ + DROP SCHEMA test CASCADE; + DROP EXTENSION injection_points; +} + +session s1 +setup { + SELECT injection_points_set_local(); + SELECT injection_points_attach('check-exclusion-or-unique-constraint-no-conflict', 'wait'); +} +step s1_start_upsert { + INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); +} + +session s2 +setup { + SELECT injection_points_set_local(); + SELECT injection_points_attach('exec-insert-before-insert-speculative', 'wait'); +} +step s2_start_upsert { + INSERT INTO test.tbl VALUES(13,now()) on conflict(i) do update set updated_at = now(); +} + +session s3 +setup { + SELECT injection_points_set_local(); +} +step s3_setup_wait_before_set_dead { + SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait'); +} +step s3_setup_wait_before_swap { + SELECT injection_points_attach('reindex-relation-concurrently-before-swap', 'wait'); +} +step s3_start_reindex { REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey; } + +session s4 +step s4_wakeup_to_swap { + SELECT injection_points_detach('reindex-relation-concurrently-before-swap'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-swap'); +} +step s4_wakeup_s1 { + SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict'); + SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict'); +} +step s4_wakeup_s2 { + SELECT injection_points_detach('exec-insert-before-insert-speculative'); + SELECT injection_points_wakeup('exec-insert-before-insert-speculative'); +} +step s4_wakeup_to_set_dead { + SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead'); + SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead'); +} + +permutation + s3_setup_wait_before_set_dead + s3_start_reindex(s1_start_upsert, s2_start_upsert) + s1_start_upsert + s4_wakeup_to_set_dead + s2_start_upsert(s1_start_upsert) + s4_wakeup_s1 + s4_wakeup_s2 + +permutation + s3_setup_wait_before_swap + s3_start_reindex(s1_start_upsert, s2_start_upsert) + s1_start_upsert + s4_wakeup_to_swap + s2_start_upsert(s1_start_upsert) + s4_wakeup_s2 + s4_wakeup_s1 + +permutation + s3_setup_wait_before_set_dead + s3_start_reindex(s1_start_upsert, s2_start_upsert) + s1_start_upsert + s2_start_upsert(s1_start_upsert) + s4_wakeup_s1 + s4_wakeup_to_set_dead + s4_wakeup_s2 \ No newline at end of file -- 2.43.0