From 129547a473edd42fa66f1490d36f8aaede529867 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 18 Nov 2019 11:51:04 +0530 Subject: [PATCH 10/17] Support logical_decoding_work_mem set from create subscription command --- doc/src/sgml/config.sgml | 21 +++++++++ doc/src/sgml/ref/create_subscription.sgml | 12 +++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 44 ++++++++++++++++--- .../libpqwalreceiver/libpqwalreceiver.c | 3 ++ src/backend/replication/logical/worker.c | 1 + src/backend/replication/pgoutput/pgoutput.c | 30 ++++++++++++- src/include/catalog/pg_subscription.h | 3 ++ src/include/replication/walreceiver.h | 1 + 9 files changed, 108 insertions(+), 8 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 5d1c90282f..8b1923c9de 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1751,6 +1751,27 @@ include_dir 'conf.d' + + logical_decoding_work_mem (integer) + + logical_decoding_work_mem configuration parameter + + + + + Specifies the maximum amount of memory to be used by logical decoding, + before some of the decoded changes are either written to local disk. + This limits the amount of memory used by logical streaming replication + connections. It defaults to 64 megabytes (64MB). + Since each replication connection only uses a single buffer of this size, + and an installation normally doesn't have many such connections + concurrently (as limited by max_wal_senders), it's + safe to set this value significantly higher than work_mem, + reducing the amount of decoded changes written to disk. + + + + max_stack_depth (integer) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 1a90c244fb..91790b0c95 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -206,6 +206,18 @@ CREATE SUBSCRIPTION subscription_name + + + work_mem (integer) + + + Limits the amount of memory used to decode changes on the + publisher. If not specified, the publisher will use the default + specified by logical_decoding_work_mem. When + needed, additional data are spilled to disk. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 68d88ff499..2a276482c1 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->workmem = subform->subworkmem; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5408edcfc2..fbb447379f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -58,7 +58,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, - bool *refresh) + bool *refresh, int *logical_wm, + bool *logical_wm_given) { ListCell *lc; bool connect_given = false; @@ -89,6 +90,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *synchronous_commit = NULL; if (refresh) *refresh = true; + if (logical_wm) + *logical_wm_given = false; /* Parse options */ foreach(lc, options) @@ -174,6 +177,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, refresh_given = true; *refresh = defGetBoolean(defel); } + else if (strcmp(defel->defname, "work_mem") == 0 && logical_wm) + { + if (*logical_wm_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *logical_wm_given = true; + *logical_wm = defGetInt32(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -317,6 +330,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + int logical_wm; + bool logical_wm_given; char *synchronous_commit; char *conninfo; char *slotname; @@ -333,7 +348,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) parse_subscription_options(stmt->options, &connect, &enabled_given, &enabled, &create_slot, &slotname_given, &slotname, ©_data, &synchronous_commit, - NULL); + NULL, &logical_wm, &logical_wm_given); /* * Since creating a replication slot is not transactional, rolling back @@ -411,6 +426,12 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); + if (logical_wm_given) + values[Anum_pg_subscription_subworkmem - 1] = + Int32GetDatum(logical_wm); + else + nulls[Anum_pg_subscription_subworkmem - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -669,10 +690,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *slotname; bool slotname_given; char *synchronous_commit; + int logical_wm; + bool logical_wm_given; parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, &slotname_given, &slotname, - NULL, &synchronous_commit, NULL); + NULL, &synchronous_commit, NULL, + &logical_wm, &logical_wm_given); if (slotname_given) { @@ -697,6 +721,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subsynccommit - 1] = true; } + if (logical_wm_given) + { + values[Anum_pg_subscription_subworkmem - 1] = + Int32GetDatum(logical_wm); + replaces[Anum_pg_subscription_subworkmem - 1] = true; + } + update_tuple = true; break; } @@ -708,7 +739,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL, NULL, NULL); + NULL, NULL, NULL, NULL, NULL, + NULL, NULL); Assert(enabled_given); if (!sub->slotname && enabled) @@ -746,7 +778,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, &refresh); + NULL, &refresh, NULL, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -783,7 +815,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, NULL); + NULL, NULL, NULL, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 545d2fcd05..0ab6855ad8 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -406,6 +406,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, "proto_version '%u'", options->proto.logical.proto_version); + appendStringInfo(&cmd, ", work_mem '%d'", + options->proto.logical.work_mem); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 63ba0ae234..c80acd3eb0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1729,6 +1729,7 @@ ApplyWorkerMain(Datum main_arg) options.slotname = myslotname; options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; + options.proto.logical.work_mem = MySubscription->workmem; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3483c1b877..cf6e03b9a7 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -18,6 +18,7 @@ #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/guc.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/memutils.h" @@ -87,11 +88,12 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names) + List **publication_names, int *logical_decoding_work_mem) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; + bool work_mem_given = false; foreach(lc, options) { @@ -137,6 +139,29 @@ parse_output_parameters(List *options, uint32 *protocol_version, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax"))); } + else if (strcmp(defel->defname, "work_mem") == 0) + { + int64 parsed; + + if (work_mem_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + work_mem_given = true; + + if (!scanint8(strVal(defel->arg), true, &parsed)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid work_mem"))); + + if (parsed > PG_INT32_MAX || parsed < 64) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("work_mem \"%s\" out of range", + strVal(defel->arg)))); + + *logical_decoding_work_mem = (int)parsed; + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -171,7 +196,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Parse the params and ERROR if we see any we don't recognize */ parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, - &data->publication_names); + &data->publication_names, + &logical_decoding_work_mem); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 3cb13d897e..10ea113e4d 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -48,6 +48,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + int32 subworkmem; /* Memory to use to decode changes. */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -73,6 +75,7 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + int workmem; /* Memory to decode changes. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 41714eaf0c..1db706af54 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -162,6 +162,7 @@ typedef struct { uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ + int work_mem; /* Memory limit to use for decoding */ } logical; } proto; } WalRcvStreamOptions; -- 2.21.0