From 83398a4d454e579be9d251a8c60d9b0caf3140b3 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Fri, 13 Jan 2023 20:32:44 +0100 Subject: [PATCH v202311272 2/4] Add decoding of sequences to test_decoding Extends test_decoding with callbacks to decode sequences. This feature is disabled by default (i.e. sequence changes are omitted from the output), but may be enabled by passing 'include-sequences' option to the test_decoding plugin. Author: Tomas Vondra, Cary Huang Reviewed-by: Ashutosh Bapat, Amit Kapila, Peter Eisentraut, Hannu Krosing, Andres Freund, Zhijie Hou Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca --- contrib/test_decoding/Makefile | 3 +- contrib/test_decoding/expected/sequence.out | 325 ++++++++++++++++++++ contrib/test_decoding/sql/sequence.sql | 119 +++++++ contrib/test_decoding/test_decoding.c | 87 ++++++ 4 files changed, 533 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/sequence.out create mode 100644 contrib/test_decoding/sql/sequence.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..05c0c5a2f8 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,8 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats twophase twophase_stream + spill slot truncate stream stats twophase twophase_stream \ + sequence ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out new file mode 100644 index 0000000000..98a8f25e5a --- /dev/null +++ b/contrib/test_decoding/expected/sequence.out @@ -0,0 +1,325 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE SEQUENCE test_sequence; +-- test the sequence changes by several nextval() calls +SELECT nextval('test_sequence'); + nextval +--------- + 1 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 2 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 4 +(1 row) + +-- test the sequence changes by several ALTER commands +ALTER SEQUENCE test_sequence INCREMENT BY 10; +SELECT nextval('test_sequence'); + nextval +--------- + 14 +(1 row) + +ALTER SEQUENCE test_sequence START WITH 3000; +ALTER SEQUENCE test_sequence MAXVALUE 10000; +ALTER SEQUENCE test_sequence RESTART WITH 4000; +SELECT nextval('test_sequence'); + nextval +--------- + 4000 +(1 row) + +-- test the sequence changes by several setval() calls +SELECT setval('test_sequence', 3500); + setval +-------- + 3500 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3510 +(1 row) + +SELECT setval('test_sequence', 3500, true); + setval +-------- + 3500 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3510 +(1 row) + +SELECT setval('test_sequence', 3500, false); + setval +-------- + 3500 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3500 +(1 row) + +-- show results and drop sequence +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + data +------------------------------------------------------------- + BEGIN + sequence public.test_sequence: transactional: 1 value: 1 + COMMIT + sequence public.test_sequence: transactional: 0 value: 33 + BEGIN + sequence public.test_sequence: transactional: 1 value: 4 + COMMIT + sequence public.test_sequence: transactional: 0 value: 334 + BEGIN + sequence public.test_sequence: transactional: 1 value: 46 + COMMIT + BEGIN + sequence public.test_sequence: transactional: 1 value: 14 + COMMIT + BEGIN + sequence public.test_sequence: transactional: 1 value: 4000 + COMMIT + sequence public.test_sequence: transactional: 0 value: 4320 + sequence public.test_sequence: transactional: 0 value: 3500 + sequence public.test_sequence: transactional: 0 value: 3830 + sequence public.test_sequence: transactional: 0 value: 3500 + sequence public.test_sequence: transactional: 0 value: 3830 + sequence public.test_sequence: transactional: 0 value: 3500 + sequence public.test_sequence: transactional: 0 value: 3820 +(24 rows) + +DROP SEQUENCE test_sequence; +-- rollback on sequence creation and update +BEGIN; +CREATE SEQUENCE test_sequence; +CREATE TABLE test_table (a INT); +SELECT nextval('test_sequence'); + nextval +--------- + 1 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 2 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3 +(1 row) + +SELECT setval('test_sequence', 3000); + setval +-------- + 3000 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3001 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3002 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3003 +(1 row) + +ALTER SEQUENCE test_sequence RESTART WITH 6000; +INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) ); +SELECT nextval('test_sequence'); + nextval +--------- + 6001 +(1 row) + +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + data +------ +(0 rows) + +-- rollback on table creation with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); + setval +-------- + 3000 +(1 row) + +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + data +------ +(0 rows) + +-- rollback on table with serial column +CREATE TABLE test_table (a SERIAL, b INT); +BEGIN; +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); + setval +-------- + 3000 +(1 row) + +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; +-- check table and sequence values after rollback +SELECT * from test_table_a_seq; + last_value | log_cnt | is_called +------------+---------+----------- + 3003 | 30 | t +(1 row) + +SELECT nextval('test_table_a_seq'); + nextval +--------- + 3004 +(1 row) + +DROP TABLE test_table; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + data +---------------------------------------------------------------- + BEGIN + sequence public.test_table_a_seq: transactional: 1 value: 1 + COMMIT + sequence public.test_table_a_seq: transactional: 0 value: 33 + sequence public.test_table_a_seq: transactional: 0 value: 3000 + sequence public.test_table_a_seq: transactional: 0 value: 3033 +(6 rows) + +-- savepoint test on table with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +SAVEPOINT a; +INSERT INTO test_table (b) VALUES (300); +ROLLBACK TO SAVEPOINT a; +DROP TABLE test_table; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + data +-------------------------------------------------------------- + BEGIN + sequence public.test_table_a_seq: transactional: 1 value: 1 + sequence public.test_table_a_seq: transactional: 1 value: 33 + table public.test_table: INSERT: a[integer]:1 b[integer]:100 + table public.test_table: INSERT: a[integer]:2 b[integer]:200 + COMMIT +(6 rows) + +-- savepoint test on table with serial column +BEGIN; +CREATE SEQUENCE test_sequence; +SELECT nextval('test_sequence'); + nextval +--------- + 1 +(1 row) + +SELECT setval('test_sequence', 3000); + setval +-------- + 3000 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3001 +(1 row) + +SAVEPOINT a; +ALTER SEQUENCE test_sequence START WITH 7000; +SELECT setval('test_sequence', 5000); + setval +-------- + 5000 +(1 row) + +ROLLBACK TO SAVEPOINT a; +SELECT * FROM test_sequence; + last_value | log_cnt | is_called +------------+---------+----------- + 3001 | 32 | t +(1 row) + +DROP SEQUENCE test_sequence; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + data +------------------------------------------------------------- + BEGIN + sequence public.test_sequence: transactional: 1 value: 1 + sequence public.test_sequence: transactional: 1 value: 33 + sequence public.test_sequence: transactional: 1 value: 3000 + sequence public.test_sequence: transactional: 1 value: 3033 + COMMIT +(6 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql new file mode 100644 index 0000000000..c19a5d5074 --- /dev/null +++ b/contrib/test_decoding/sql/sequence.sql @@ -0,0 +1,119 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE SEQUENCE test_sequence; + +-- test the sequence changes by several nextval() calls +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); + +-- test the sequence changes by several ALTER commands +ALTER SEQUENCE test_sequence INCREMENT BY 10; +SELECT nextval('test_sequence'); + +ALTER SEQUENCE test_sequence START WITH 3000; +ALTER SEQUENCE test_sequence MAXVALUE 10000; +ALTER SEQUENCE test_sequence RESTART WITH 4000; +SELECT nextval('test_sequence'); + +-- test the sequence changes by several setval() calls +SELECT setval('test_sequence', 3500); +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3500, true); +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3500, false); +SELECT nextval('test_sequence'); + +-- show results and drop sequence +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); +DROP SEQUENCE test_sequence; + +-- rollback on sequence creation and update +BEGIN; +CREATE SEQUENCE test_sequence; +CREATE TABLE test_table (a INT); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3000); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +ALTER SEQUENCE test_sequence RESTART WITH 6000; +INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) ); +SELECT nextval('test_sequence'); +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + +-- rollback on table creation with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + +-- rollback on table with serial column +CREATE TABLE test_table (a SERIAL, b INT); + +BEGIN; +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; + +-- check table and sequence values after rollback +SELECT * from test_table_a_seq; +SELECT nextval('test_table_a_seq'); + +DROP TABLE test_table; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + +-- savepoint test on table with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +SAVEPOINT a; +INSERT INTO test_table (b) VALUES (300); +ROLLBACK TO SAVEPOINT a; +DROP TABLE test_table; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + +-- savepoint test on table with serial column +BEGIN; +CREATE SEQUENCE test_sequence; +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3000); +SELECT nextval('test_sequence'); +SAVEPOINT a; +ALTER SEQUENCE test_sequence START WITH 7000; +SELECT setval('test_sequence', 5000); +ROLLBACK TO SAVEPOINT a; +SELECT * FROM test_sequence; +DROP SEQUENCE test_sequence; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1'); + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 288fd0bb4a..0a781edffc 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -29,6 +29,7 @@ typedef struct MemoryContext context; bool include_xids; bool include_timestamp; + bool include_sequences; bool skip_empty_xacts; bool only_local; } TestDecodingData; @@ -72,6 +73,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation rel, bool transactional, + int64 value); static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); @@ -112,6 +117,10 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_stream_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation rel, bool transactional, + int64 value); static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], @@ -135,6 +144,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->sequence_cb = pg_decode_sequence; cb->filter_prepare_cb = pg_decode_filter_prepare; cb->begin_prepare_cb = pg_decode_begin_prepare_txn; cb->prepare_cb = pg_decode_prepare_txn; @@ -147,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pg_decode_stream_commit; cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; + cb->stream_sequence_cb = pg_decode_stream_sequence; cb->stream_truncate_cb = pg_decode_stream_truncate; } @@ -168,6 +179,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->include_sequences = false; ctx->output_plugin_private = data; @@ -259,6 +271,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "include-sequences") == 0) + { + + if (elem->arg == NULL) + data->include_sequences = true; + else if (!parse_bool(strVal(elem->arg), &data->include_sequences)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -762,6 +785,38 @@ pg_decode_message(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } +static void +pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 value) +{ + TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + + if (!data->include_sequences) + return; + + /* output BEGIN if we haven't yet, but only for the transactional case */ + if (transactional) + { + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) + { + pg_output_begin(ctx, data, txn, false); + } + txndata->xact_wrote_changes = true; + } + + OutputPluginPrepareWrite(ctx, true); + appendStringInfoString(ctx->out, "sequence "); + appendStringInfoString(ctx->out, + quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))), + RelationGetRelationName(rel))); + appendStringInfo(ctx->out, ": transactional: %d value: " INT64_FORMAT, + transactional, value); + OutputPluginWrite(ctx, true); +} + static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) @@ -974,6 +1029,38 @@ pg_decode_stream_message(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } +static void +pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 value) +{ + TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + + if (!data->include_sequences) + return; + + /* output BEGIN if we haven't yet, but only for the transactional case */ + if (transactional) + { + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) + { + pg_output_stream_start(ctx, data, txn, false); + } + txndata->xact_wrote_changes = true; + } + + OutputPluginPrepareWrite(ctx, true); + appendStringInfoString(ctx->out, "streaming sequence "); + appendStringInfoString(ctx->out, + quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))), + RelationGetRelationName(rel))); + appendStringInfo(ctx->out, ": transactional: %d value: " INT64_FORMAT, + transactional, value); + OutputPluginWrite(ctx, true); +} + /* * In streaming mode, we don't display the detailed information of Truncate. * See pg_decode_stream_change. -- 2.27.0