From 40f16e060194280a3ce345a1cde57e22a9892007 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Mon, 6 Mar 2017 13:07:45 +0100 Subject: [PATCH] Add option to modify sync commit per subscription This also changes default behaviour of subscription workers to synchronous_commit = off --- doc/src/sgml/catalogs.sgml | 10 ++++++ doc/src/sgml/ref/alter_subscription.sgml | 2 ++ doc/src/sgml/ref/create_subscription.sgml | 30 +++++++++++++++- src/backend/catalog/pg_subscription.c | 8 +++++ src/backend/commands/subscriptioncmds.c | 56 ++++++++++++++++++++++++------ src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/worker.c | 8 +++++ src/bin/pg_dump/pg_dump.c | 11 +++++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/t/002_pg_dump.pl | 2 +- src/bin/psql/describe.c | 5 ++- src/include/catalog/pg_subscription.h | 7 ++-- src/test/regress/expected/subscription.out | 27 +++++++------- src/test/regress/sql/subscription.sql | 3 +- 14 files changed, 141 insertions(+), 31 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 5883673..2d878ad 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6531,6 +6531,16 @@ + subsynccommit + text + + + Contains value for synchronous_commit setting of the + subscription workers. + + + + subconninfo text diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 640fac0..f71ee38 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -26,6 +26,7 @@ ALTER SUBSCRIPTION name WITH ( where suboption can be: SLOT NAME = slot_name + | SYNCHRONOUS_COMMIT = synchronous_commit ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] { REFRESH WITH ( puboption [, ... ] ) | NOREFRESH } ALTER SUBSCRIPTION name REFRESH PUBLICATION WITH ( puboption [, ... ] ) @@ -91,6 +92,7 @@ ALTER SUBSCRIPTION name DISABLE CONNECTION 'conninfo' SLOT NAME = slot_name + SYNCHRONOUS_COMMIT = synchronous_commit These clauses alter properties originally set by diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 3410d6f..3fc010e 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -32,6 +32,7 @@ CREATE SUBSCRIPTION subscription_nameslot_name | COPY DATA | NOCOPY DATA + | SYNCHRONOUS_COMMIT = synchronous_commit | NOCONNECT @@ -148,7 +149,34 @@ CREATE SUBSCRIPTION subscription_name - NOCONNECT + SYNCHRONOUS_COMMIT = synchronous_commit + + + The value of this parameter overrides the + setting. The default value is + off. + + + It's safe to use off for logical replication because + in case of transaction loss on subscriber due to missing sync, the data + will be resent from publisher. + + + The logical replication worker will also report position of the writes + and flushes to the publisher so the publisher will wait on the actual + flush when doing synchronous logical replication. This however means + that setting the SYNCHRONOUS_COMMIT for subscriber + to off when the subscription is used for synchronous + replication may increase the latency for COMMIT on + the publisher. In this scenario it may be advantageous to set the + SYNCHRONOUS_COMMIT to local or + higher. + + + + + + NOCONNECT Instructs CREATE SUBSCRIPTION to skip the initial diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index f5ba9f6..4f294d3 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -85,6 +85,14 @@ GetSubscription(Oid subid, bool missing_ok) Assert(!isnull); sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); + /* Get syncommit */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subsynccommit, + &isnull); + Assert(!isnull); + sub->synccommit = pstrdup(TextDatumGetCString(datum)); + /* Get publications */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5bf8d93..a7f4649 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -44,6 +44,7 @@ #include "storage/lmgr.h" #include "utils/builtins.h" +#include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, char **slot_name, - bool *copy_data) + bool *copy_data, char **synchronous_commit) { ListCell *lc; bool connect_given = false; @@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *slot_name = NULL; if (copy_data) *copy_data = true; + if (synchronous_commit) + *synchronous_commit = NULL; /* Parse options */ foreach (lc, options) @@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, copy_data_given = true; *copy_data = !defGetBoolean(defel); } + else if (strcmp(defel->defname, "synchronous_commit") == 0 && + synchronous_commit) + { + if (*synchronous_commit) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *synchronous_commit = defGetString(defel); + + /* Test if the given value is valid for synchronous_commit GUC. */ + (void) set_config_option("synchronous_commit", *synchronous_commit, + PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, + false, 0, false); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + char *synchronous_commit; char *conninfo; char *slotname; char originname[NAMEDATALEN]; @@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data); + &enabled, &create_slot, &slotname, ©_data, + &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) if (slotname == NULL) slotname = stmt->subname; + /* The default for synchronous_commit of subscriptions is off. */ + if (synchronous_commit == NULL) + synchronous_commit = "off"; conninfo = stmt->conninfo; publications = stmt->publication; @@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) CStringGetTextDatum(conninfo); values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(slotname)); + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum(synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); @@ -581,14 +606,25 @@ AlterSubscription(AlterSubscriptionStmt *stmt) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slot_name; + char *slot_name; + char *synchronous_commit; parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL); + NULL, &slot_name, NULL, + &synchronous_commit); - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); - replaces[Anum_pg_subscription_subslotname - 1] = true; + if (slot_name) + { + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + replaces[Anum_pg_subscription_subslotname - 1] = true; + } + if (synchronous_commit) + { + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum(synchronous_commit); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; + } update_tuple = true; break; @@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL); + NULL, NULL, NULL); Assert(enabled_given); values[Anum_pg_subscription_subenabled - 1] = @@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7ba239c..47c88d3 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -129,7 +129,7 @@ get_subscription_list(void) */ oldcxt = MemoryContextSwitchTo(resultcxt); - sub = (Subscription *) palloc(sizeof(Subscription)); + sub = (Subscription *) palloc0(sizeof(Subscription)); sub->oid = HeapTupleGetOid(tup); sub->dbid = subform->subdbid; sub->owner = subform->subowner; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3313448..29b6c6a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1416,6 +1416,10 @@ reread_subscription(void) MemoryContextSwitchTo(oldctx); + /* Change synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", MySubscription->synccommit, + PGC_BACKEND, PGC_S_OVERRIDE); + if (started_tx) CommitTransactionCommand(); @@ -1485,6 +1489,10 @@ ApplyWorkerMain(Datum main_arg) MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); + /* Setup synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", MySubscription->synccommit, + PGC_BACKEND, PGC_S_OVERRIDE); + if (!MySubscription->enabled) { ereport(LOG, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 14dc1b2..d72a88d 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3669,6 +3669,7 @@ getSubscriptions(Archive *fout) int i_rolname; int i_subconninfo; int i_subslotname; + int i_subsynccommit; int i_subpublications; int i, ntups; @@ -3700,7 +3701,8 @@ getSubscriptions(Archive *fout) appendPQExpBuffer(query, "SELECT s.tableoid, s.oid, s.subname," "(%s s.subowner) AS rolname, " - " s.subconninfo, s.subslotname, s.subpublications " + " s.subconninfo, s.subslotname, s.subsynccommit, " + " s.subpublications " "FROM pg_catalog.pg_subscription s " "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database" " WHERE datname = current_database())", @@ -3715,6 +3717,7 @@ getSubscriptions(Archive *fout) i_rolname = PQfnumber(res, "rolname"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); + i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -3730,6 +3733,8 @@ getSubscriptions(Archive *fout) subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname)); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname)); + subinfo[i].subsynccommit = + pg_strdup(PQgetvalue(res, i, i_subsynccommit)); subinfo[i].subpublications = pg_strdup(PQgetvalue(res, i, i_subpublications)); @@ -3794,6 +3799,10 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) appendPQExpBuffer(query, " PUBLICATION %s WITH (NOCONNECT, SLOT NAME = ", publications->data); appendStringLiteralAH(query, subinfo->subslotname, fout); + + appendPQExpBufferStr(query, ", SYNCHRONOUS_COMMIT = "); + appendStringLiteralAH(query, subinfo->subsynccommit, fout); + appendPQExpBufferStr(query, ");\n"); ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId, diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index ba85392..471cfce 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -616,6 +616,7 @@ typedef struct _SubscriptionInfo char *rolname; char *subconninfo; char *subslotname; + char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index e0d1ce6..1adacf4 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -4305,7 +4305,7 @@ qr/CREATE TRANSFORM FOR integer LANGUAGE sql \(FROM SQL WITH FUNCTION pg_catalog CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 WITH (NOCONNECT);', regexp => qr/^ - \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (NOCONNECT, SLOT NAME = 'sub1');\E + \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (NOCONNECT, SLOT NAME = 'sub1', SYNCHRONOUS_COMMIT = 'off');\E /xm, like => { binary_upgrade => 1, diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 696b153..a220abb 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5199,7 +5199,8 @@ describeSubscriptions(const char *pattern, bool verbose) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, + false, false}; if (pset.sversion < 100000) { @@ -5225,7 +5226,9 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { appendPQExpBuffer(&buf, + ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", + gettext_noop("Synchronous Commit"), gettext_noop("Conninfo")); } diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0811880..ba9251a 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -44,6 +44,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE text subconninfo; /* Connection string to the publisher */ NameData subslotname; /* Slot name on publisher */ + text subsynccommit; /* Synchronous commit setting for worker */ text subpublications[1]; /* List of publications subscribed to */ #endif } FormData_pg_subscription; @@ -54,14 +55,15 @@ typedef FormData_pg_subscription *Form_pg_subscription; * compiler constants for pg_subscription * ---------------- */ -#define Natts_pg_subscription 7 +#define Natts_pg_subscription 8 #define Anum_pg_subscription_subdbid 1 #define Anum_pg_subscription_subname 2 #define Anum_pg_subscription_subowner 3 #define Anum_pg_subscription_subenabled 4 #define Anum_pg_subscription_subconninfo 5 #define Anum_pg_subscription_subslotname 6 -#define Anum_pg_subscription_subpublications 7 +#define Anum_pg_subscription_subsynccommit 7 +#define Anum_pg_subscription_subpublications 8 typedef struct Subscription @@ -73,6 +75,7 @@ typedef struct Subscription bool enabled; /* Indicates if the subscription is enabled */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ + char *synccommit; /* Synchronous commit setting for worker */ List *publications; /* List of publication names to subscribe to */ } Subscription; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 8760d59..140134e 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -46,10 +46,10 @@ CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WI ERROR: must be superuser to create subscriptions SET SESSION AUTHORIZATION 'regress_subscription_user'; \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+-------------+--------------------- - testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +---------+---------------------------+---------+-------------+--------------------+--------------------- + testsub | regress_subscription_user | f | {testpub} | off | dbname=doesnotexist (1 row) ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3 NOREFRESH; @@ -59,10 +59,10 @@ ALTER SUBSCRIPTION testsub WITH (SLOT NAME = 'newname'); ALTER SUBSCRIPTION doesnotexist CONNECTION 'dbname=doesnotexist2'; ERROR: subscription "doesnotexist" does not exist \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+---------------------+---------------------- - testsub | regress_subscription_user | f | {testpub2,testpub3} | dbname=doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +---------+---------------------------+---------+---------------------+--------------------+---------------------- + testsub | regress_subscription_user | f | {testpub2,testpub3} | off | dbname=doesnotexist2 (1 row) BEGIN; @@ -89,11 +89,12 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy; ERROR: must be owner of subscription testsub RESET ROLE; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; -\dRs - List of subscriptions - Name | Owner | Enabled | Publication --------------+---------------------------+---------+--------------------- - testsub_foo | regress_subscription_user | f | {testpub2,testpub3} +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +-------------+---------------------------+---------+---------------------+--------------------+---------------------- + testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | true | dbname=doesnotexist2 (1 row) -- rename back to keep the rest simple diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 7bdc2b3..39420ef 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -66,8 +66,9 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy; RESET ROLE; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true); -\dRs +\dRs+ -- rename back to keep the rest simple ALTER SUBSCRIPTION testsub_foo RENAME TO testsub; -- 2.7.4