From 7463a354906744ec78d1537be6a3993f632f8a3c Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Sat, 24 Feb 2018 17:03:57 -0500 Subject: [PATCH v2] Handle heap rewrites even better in logical decoding Logical decoding should not publish anything about tables created as part of a heap rewrite during DDL. Those tables don't exist externally, so consumers of logical decoding cannot do anything sensible with that information. In ab28feae2bd3d4629bd73ae3548e671c57d785f0, we worked around this for built-in logical replication, but that was hack. This is a more proper fix: We mark such transient heaps using the new field pg_class.relwrite, linking to the original relation OID. By default, we ignore them in logical decoding before they get to the output plugin. Optionally, a plugin can register their interest in getting such changes, if the handle DDL specially, in which case the new field will help them get information about the actual table. --- .../test_decoding/expected/concurrent_ddl_dml.out | 82 ++++++++-------------- contrib/test_decoding/expected/ddl.out | 20 +++--- .../test_decoding/specs/concurrent_ddl_dml.spec | 2 +- contrib/test_decoding/sql/ddl.sql | 5 +- contrib/test_decoding/test_decoding.c | 14 ++++ doc/src/sgml/catalogs.sgml | 12 ++++ doc/src/sgml/logicaldecoding.sgml | 5 ++ src/backend/bootstrap/bootparse.y | 1 + src/backend/catalog/heap.c | 4 ++ src/backend/catalog/toasting.c | 1 + src/backend/commands/cluster.c | 1 + src/backend/commands/tablecmds.c | 1 + src/backend/replication/logical/logical.c | 4 ++ src/backend/replication/logical/reorderbuffer.c | 7 ++ src/backend/replication/pgoutput/pgoutput.c | 26 ------- src/include/catalog/heap.h | 1 + src/include/catalog/pg_class.h | 22 +++--- src/include/replication/output_plugin.h | 1 + src/include/replication/reorderbuffer.h | 2 + 19 files changed, 109 insertions(+), 102 deletions(-) diff --git a/contrib/test_decoding/expected/concurrent_ddl_dml.out b/contrib/test_decoding/expected/concurrent_ddl_dml.out index a15bfa292e..1f9e7661b7 100644 --- a/contrib/test_decoding/expected/concurrent_ddl_dml.out +++ b/contrib/test_decoding/expected/concurrent_ddl_dml.out @@ -10,7 +10,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); step s2_alter_tbl2_float: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE float; step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -32,16 +32,13 @@ step s2_alter_tbl1_float: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 COMMIT -BEGIN -table public.pg_temp: INSERT: val1[integer]:1 val2[double precision]:1 -COMMIT ?column? stop @@ -56,7 +53,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); step s2_alter_tbl2_char: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE character varying; step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -78,16 +75,13 @@ step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varyi step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s1_commit: COMMIT; step s2_alter_tbl1_char: <... completed> -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 COMMIT -BEGIN -table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1' -COMMIT ?column? stop @@ -103,16 +97,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s2_alter_tbl1_float: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; step s1_commit: COMMIT; step s2_alter_tbl1_float: <... completed> -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 COMMIT -BEGIN -table public.pg_temp: INSERT: val1[integer]:1 val2[double precision]:1 -COMMIT ?column? stop @@ -128,16 +119,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; step s1_commit: COMMIT; step s2_alter_tbl1_char: <... completed> -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 COMMIT -BEGIN -table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1' -COMMIT ?column? stop @@ -154,16 +142,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s2_alter_tbl1_float: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; step s1_commit: COMMIT; step s2_alter_tbl1_float: <... completed> -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 table public.tbl2: INSERT: val1[integer]:1 val2[double precision]:1 COMMIT -BEGIN -table public.pg_temp: INSERT: val1[integer]:1 val2[double precision]:1 -COMMIT ?column? stop @@ -180,16 +165,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; step s1_commit: COMMIT; step s2_alter_tbl1_char: <... completed> -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 table public.tbl2: INSERT: val1[integer]:1 val2[character varying]:'1' COMMIT -BEGIN -table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1' -COMMIT ?column? stop @@ -205,7 +187,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); step s2_alter_tbl2_text: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE text; step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -229,16 +211,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; step s1_commit: COMMIT; step s2_alter_tbl1_char: <... completed> -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1 table public.tbl2: INSERT: val1[integer]:1 val2[text]:'1' COMMIT -BEGIN -table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1' -COMMIT ?column? stop @@ -254,7 +233,7 @@ step s2_alter_tbl2_boolean: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE boolean; ERROR: column "val2" cannot be cast automatically to type boolean step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -279,7 +258,7 @@ step s2_alter_tbl1_boolean: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE boolean; error in steps s1_commit s2_alter_tbl1_boolean: ERROR: column "val2" cannot be cast automatically to type boolean -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -300,7 +279,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); step s2_alter_tbl2_add_int: ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -324,7 +303,7 @@ step s1_begin: BEGIN; step s2_alter_tbl2_add_int: ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -348,7 +327,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); step s2_alter_tbl2_add_float: ALTER TABLE tbl2 ADD COLUMN val3 FLOAT; step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -372,7 +351,7 @@ step s1_begin: BEGIN; step s2_alter_tbl2_add_float: ALTER TABLE tbl2 ADD COLUMN val3 FLOAT; step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -396,7 +375,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); step s2_alter_tbl2_add_char: ALTER TABLE tbl2 ADD COLUMN val3 character varying; step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -420,7 +399,7 @@ step s1_begin: BEGIN; step s2_alter_tbl2_add_char: ALTER TABLE tbl2 ADD COLUMN val3 character varying; step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -445,7 +424,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; step s1_commit: COMMIT; step s2_alter_tbl2_drop_3rd_col: <... completed> -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -468,7 +447,7 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); step s1_commit: COMMIT; step s2_alter_tbl2_drop_3rd_col: <... completed> step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -493,7 +472,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; step s1_commit: COMMIT; step s2_alter_tbl2_drop_3rd_col: <... completed> -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -506,7 +485,7 @@ step s2_alter_tbl2_3rd_char: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE character v step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; step s2_alter_tbl2_3rd_char: <... completed> -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -515,14 +494,9 @@ table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[text]:'1' COMMIT step s2_alter_tbl2_3rd_int: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data -BEGIN -table public.pg_temp: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:null -table public.pg_temp: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 -table public.pg_temp: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 -COMMIT BEGIN table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1 COMMIT @@ -544,7 +518,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; step s2_alter_tbl2_3rd_text: <... completed> step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -573,7 +547,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; step s2_alter_tbl2_3rd_char: <... completed> step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -601,7 +575,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -628,7 +602,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); step s1_commit: COMMIT; step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1); -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN @@ -653,7 +627,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); step s1_commit: COMMIT; -step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data BEGIN diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 1e22c1eefc..b7c76469fc 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -117,11 +117,11 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc (22 rows) ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4); --- throw away changes, they contain oids +-- check that this doesn't produce any changes from the heap rewrite SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); count ------- - 12 + 0 (1 row) INSERT INTO replication_example(somedata, somenum) VALUES (5, 1); @@ -192,16 +192,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc COMMIT (33 rows) --- hide changes bc of oid visible in full table rewrites CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int); INSERT INTO tr_unique(data) VALUES(10); ALTER TABLE tr_unique RENAME TO tr_pkey; ALTER TABLE tr_pkey ADD COLUMN id serial primary key; -SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); - count -------- - 6 -(1 row) +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1'); + data +----------------------------------------------------------------------------- + BEGIN + table public.tr_unique: INSERT: id2[integer]:1 data[integer]:10 + COMMIT + BEGIN + table public.tr_pkey: INSERT: id2[integer]:1 data[integer]:10 id[integer]:1 + COMMIT +(6 rows) INSERT INTO tr_pkey(data) VALUES(1); --show deletion with primary key diff --git a/contrib/test_decoding/specs/concurrent_ddl_dml.spec b/contrib/test_decoding/specs/concurrent_ddl_dml.spec index 4a76532402..e7cea37d30 100644 --- a/contrib/test_decoding/specs/concurrent_ddl_dml.spec +++ b/contrib/test_decoding/specs/concurrent_ddl_dml.spec @@ -53,7 +53,7 @@ step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE characte step "s2_alter_tbl2_3rd_text" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; } step "s2_alter_tbl2_3rd_int" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; } -step "s2_get_changes" { SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } +step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql index 057dae056b..c4b10a4cf9 100644 --- a/contrib/test_decoding/sql/ddl.sql +++ b/contrib/test_decoding/sql/ddl.sql @@ -67,7 +67,7 @@ CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varch SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4); --- throw away changes, they contain oids +-- check that this doesn't produce any changes from the heap rewrite SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); INSERT INTO replication_example(somedata, somenum) VALUES (5, 1); @@ -93,12 +93,11 @@ CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varch /* display results */ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); --- hide changes bc of oid visible in full table rewrites CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int); INSERT INTO tr_unique(data) VALUES(10); ALTER TABLE tr_unique RENAME TO tr_pkey; ALTER TABLE tr_pkey ADD COLUMN id serial primary key; -SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1'); INSERT INTO tr_pkey(data) VALUES(1); --show deletion with primary key diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 0f18afa852..d12b0b4adc 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -111,6 +111,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ctx->output_plugin_private = data; opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; + opt->receive_rewrites = false; foreach(option, ctx->output_plugin_options) { @@ -176,6 +177,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "include-rewrites") == 0) + { + + if (elem->arg == NULL) + continue; + else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -422,6 +434,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, quote_qualified_identifier( get_namespace_name( get_rel_namespace(RelationGetRelid(relation))), + class_form->relrewrite ? + get_rel_name(class_form->relrewrite) : NameStr(class_form->relname))); appendStringInfoChar(ctx->out, ':'); diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 71e20f2740..a1ec5dcf47 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -1932,6 +1932,18 @@ <structname>pg_class</structname> Columns True if table is a partition + + relrewrite + oid + pg_class.oid + + For new relations being written during a DDL operation that requires a + table rewrite, this contains the OID of the original relation; + otherwise 0. That state is only visible internally. This field should + never contain anything other than 0 for a user-visible relation. + + + relfrozenxid xid diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 5501eed108..ef4a51c123 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -486,12 +486,17 @@ Startup Callback typedef struct OutputPluginOptions { OutputPluginOutputType output_type; + bool receive_rewrites; } OutputPluginOptions; output_type has to either be set to OUTPUT_PLUGIN_TEXTUAL_OUTPUT or OUTPUT_PLUGIN_BINARY_OUTPUT. See also . + If receive_rewrites is true, the output plugin will + also be called for changes made by heap rewrites during certain DDL + operations. These are of interest to plugins that handle DDL + replication but require special handling. diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index 9e81f9514d..6998ac20fc 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -257,6 +257,7 @@ Boot_CreateStmt: false, true, false, + InvalidOid, NULL); elog(DEBUG4, "relation created with OID %u", id); } diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index cf36ce4add..fb1aa70c81 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -807,6 +807,7 @@ InsertPgClassTuple(Relation pg_class_desc, values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated); values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident); values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition); + values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite); values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid); values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid); if (relacl != (Datum) 0) @@ -1039,6 +1040,7 @@ heap_create_with_catalog(const char *relname, bool use_user_acl, bool allow_system_table_mods, bool is_internal, + Oid relrewrite, ObjectAddress *typaddress) { Relation pg_class_desc; @@ -1177,6 +1179,8 @@ heap_create_with_catalog(const char *relname, Assert(relid == RelationGetRelid(new_rel_desc)); + new_rel_desc->rd_rel->relrewrite = relrewrite; + /* * Decide whether to create an array type over the relation's rowtype. We * do not create any array types for system catalogs (ie, those made diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c index 8bf2698545..c4515e6c1d 100644 --- a/src/backend/catalog/toasting.c +++ b/src/backend/catalog/toasting.c @@ -279,6 +279,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, false, true, true, + InvalidOid, NULL); Assert(toast_relid != InvalidOid); diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 5d481dd50d..6ddf8773b3 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -692,6 +692,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, char relpersistence, false, true, true, + OIDOldHeap, NULL); Assert(OIDNewHeap != InvalidOid); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 74e020bffc..b3e57d847c 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -764,6 +764,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, true, allowSystemTableMods, false, + InvalidOid, typaddress); /* Store inheritance information for new rel. */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7637efc32e..e2e39f4577 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -317,6 +317,8 @@ CreateInitDecodingContext(char *plugin, startup_cb_wrapper(ctx, &ctx->options, true); MemoryContextSwitchTo(old_context); + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; + return ctx; } @@ -410,6 +412,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, startup_cb_wrapper(ctx, &ctx->options, false); MemoryContextSwitchTo(old_context); + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; + ereport(LOG, (errmsg("starting logical decoding for slot \"%s\"", NameStr(slot->data.name)), diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c72a611a39..8a4aa90345 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1386,6 +1386,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (!RelationIsLogicallyLogged(relation)) goto change_done; + /* + * Ignore temporary heaps created during DDL unless the + * plugin has asked for them. + */ + if (relation->rd_rel->relrewrite && !rb->output_rewrites) + goto change_done; + /* * For now ignore sequence changes entirely. Most of the * time they don't log changes using records we diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index d538f25ede..aa9cf5b54e 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -21,7 +21,6 @@ #include "utils/inval.h" #include "utils/int8.h" -#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -511,31 +510,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); - /* - * Skip tables that look like they are from a heap rewrite (see - * make_new_heap()). We need to skip them because the subscriber - * won't have a table by that name to receive the data. That - * means we won't ship the new data in, say, an added column with - * a DEFAULT, but if the user applies the same DDL manually on the - * subscriber, then this will work out for them. - * - * We only need to consider the alltables case, because such a - * transient heap won't be an explicit member of a publication. - */ - if (pub->alltables) - { - char *relname = get_rel_name(relid); - unsigned int u; - int n; - - if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 && - relname[n] == '\0') - { - if (get_rel_relkind(u) == RELKIND_RELATION) - break; - } - } - if (pub->alltables || list_member_oid(pubids, pub->oid)) { entry->pubactions.pubinsert |= pub->pubactions.pubinsert; diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index 9bdc63ceb5..3308fa3dfd 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -71,6 +71,7 @@ extern Oid heap_create_with_catalog(const char *relname, bool use_user_acl, bool allow_system_table_mods, bool is_internal, + Oid relrewrite, ObjectAddress *typaddress); extern void heap_create_init_fork(Relation rel); diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h index 26b1866c69..7c49e7a019 100644 --- a/src/include/catalog/pg_class.h +++ b/src/include/catalog/pg_class.h @@ -71,6 +71,7 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO bool relispopulated; /* matview currently holds query results */ char relreplident; /* see REPLICA_IDENTITY_xxx constants */ bool relispartition; /* is relation a partition? */ + Oid relrewrite; /* heap for rewrite during DDL, link to original rel */ TransactionId relfrozenxid; /* all Xids < this are frozen in this rel */ TransactionId relminmxid; /* all multixacts in this rel are >= this. * this is really a MultiXactId */ @@ -99,7 +100,7 @@ typedef FormData_pg_class *Form_pg_class; * ---------------- */ -#define Natts_pg_class 33 +#define Natts_pg_class 34 #define Anum_pg_class_relname 1 #define Anum_pg_class_relnamespace 2 #define Anum_pg_class_reltype 3 @@ -128,11 +129,12 @@ typedef FormData_pg_class *Form_pg_class; #define Anum_pg_class_relispopulated 26 #define Anum_pg_class_relreplident 27 #define Anum_pg_class_relispartition 28 -#define Anum_pg_class_relfrozenxid 29 -#define Anum_pg_class_relminmxid 30 -#define Anum_pg_class_relacl 31 -#define Anum_pg_class_reloptions 32 -#define Anum_pg_class_relpartbound 33 +#define Anum_pg_class_relrewrite 29 +#define Anum_pg_class_relfrozenxid 30 +#define Anum_pg_class_relminmxid 31 +#define Anum_pg_class_relacl 32 +#define Anum_pg_class_reloptions 33 +#define Anum_pg_class_relpartbound 34 /* ---------------- * initial contents of pg_class @@ -147,13 +149,13 @@ typedef FormData_pg_class *Form_pg_class; * Note: "3" in the relfrozenxid column stands for FirstNormalTransactionId; * similarly, "1" in relminmxid stands for FirstMultiXactId */ -DATA(insert OID = 1247 ( pg_type PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t f f f f f f t n f 3 1 _null_ _null_ _null_)); +DATA(insert OID = 1247 ( pg_type PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t f f f f f f t n f 0 3 1 _null_ _null_ _null_)); DESCR(""); -DATA(insert OID = 1249 ( pg_attribute PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 22 0 f f f f f f f t n f 3 1 _null_ _null_ _null_)); +DATA(insert OID = 1249 ( pg_attribute PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 22 0 f f f f f f f t n f 0 3 1 _null_ _null_ _null_)); DESCR(""); -DATA(insert OID = 1255 ( pg_proc PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 29 0 t f f f f f f t n f 3 1 _null_ _null_ _null_)); +DATA(insert OID = 1255 ( pg_proc PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 29 0 t f f f f f f t n f 0 3 1 _null_ _null_ _null_)); DESCR(""); -DATA(insert OID = 1259 ( pg_class PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 33 0 t f f f f f f t n f 3 1 _null_ _null_ _null_)); +DATA(insert OID = 1259 ( pg_class PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 34 0 t f f f f f f t n f 0 3 1 _null_ _null_ _null_)); DESCR(""); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 78fd38bb16..82875d6b3d 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -26,6 +26,7 @@ typedef enum OutputPluginOutputType typedef struct OutputPluginOptions { OutputPluginOutputType output_type; + bool receive_rewrites; } OutputPluginOptions; /* diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0970abca52..6464722471 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -336,6 +336,8 @@ struct ReorderBuffer */ void *private_data; + bool output_rewrites; + /* * Private memory context. */ base-commit: 51057feaa6bd24b51e6a4715c2090491ef037534 -- 2.16.2