Re: Logical Replication WIP

From: Andres Freund <andres(at)anarazel(dot)de>
To: Petr Jelinek <petr(dot)jelinek(at)2ndquadrant(dot)com>
Cc: Peter Eisentraut <peter(dot)eisentraut(at)2ndquadrant(dot)com>, Erik Rijkers <er(at)xs4all(dot)nl>, Steve Singer <steve(at)ssinger(dot)info>, Craig Ringer <craig(at)2ndquadrant(dot)com>, Simon Riggs <simon(at)2ndquadrant(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>, pgsql-hackers-owner(at)postgresql(dot)org
Subject: Re: Logical Replication WIP
Date: 2016-12-13 01:41:55
Message-ID: 20161213014155.pv5xkw7rerat5paq@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On 2016-12-10 08:48:55 +0100, Petr Jelinek wrote:

> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
> new file mode 100644
> index 0000000..e3560b7
> --- /dev/null
> +++ b/src/backend/catalog/pg_publication.c
> +
> +Datum pg_get_publication_tables(PG_FUNCTION_ARGS);

Don't we usually put these in a header?

> +/*
> + * Insert new publication / relation mapping.
> + */
> +ObjectAddress
> +publication_add_relation(Oid pubid, Relation targetrel,
> + bool if_not_exists)
> +{
> + Relation rel;
> + HeapTuple tup;
> + Datum values[Natts_pg_publication_rel];
> + bool nulls[Natts_pg_publication_rel];
> + Oid relid = RelationGetRelid(targetrel);
> + Oid prrelid;
> + Publication *pub = GetPublication(pubid);
> + ObjectAddress myself,
> + referenced;
> +
> + rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
> +
> + /* Check for duplicates */

Maybe mention that that check is racy, but a unique index protects
against the race?

> + /* Insert tuple into catalog. */
> + prrelid = simple_heap_insert(rel, tup);
> + CatalogUpdateIndexes(rel, tup);
> + heap_freetuple(tup);
> +
> + ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
> +
> + /* Add dependency on the publication */
> + ObjectAddressSet(referenced, PublicationRelationId, pubid);
> + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
> +
> + /* Add dependency on the relation */
> + ObjectAddressSet(referenced, RelationRelationId, relid);
> + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
> +
> + /* Close the table. */
> + heap_close(rel, RowExclusiveLock);

I'm not quite sure abou the policy, but shouldn't we invoke
InvokeObjectPostCreateHook etc here?

> +/*
> + * Gets list of relation oids for a publication.
> + *
> + * This should only be used for normal publications, the FOR ALL TABLES
> + * should use GetAllTablesPublicationRelations().
> + */
> +List *
> +GetPublicationRelations(Oid pubid)
> +{
> + List *result;
> + Relation pubrelsrel;
> + ScanKeyData scankey;
> + SysScanDesc scan;
> + HeapTuple tup;
> +
> + /* Find all publications associated with the relation. */
> + pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
> +
> + ScanKeyInit(&scankey,
> + Anum_pg_publication_rel_prpubid,
> + BTEqualStrategyNumber, F_OIDEQ,
> + ObjectIdGetDatum(pubid));
> +
> + scan = systable_beginscan(pubrelsrel, PublicationRelMapIndexId, true,
> + NULL, 1, &scankey);
> +
> + result = NIL;
> + while (HeapTupleIsValid(tup = systable_getnext(scan)))
> + {
> + Form_pg_publication_rel pubrel;
> +
> + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
> +
> + result = lappend_oid(result, pubrel->prrelid);
> + }
> +
> + systable_endscan(scan);
> + heap_close(pubrelsrel, NoLock);

In other parts of this you drop the lock, but not here?

> + heap_close(rel, NoLock);
> +
> + return result;
> +}

and here.

> +/*
> + * Gets list of all relation published by FOR ALL TABLES publication(s).
> + */
> +List *
> +GetAllTablesPublicationRelations(void)
> +{
> + Relation classRel;
> + ScanKeyData key[1];
> + HeapScanDesc scan;
> + HeapTuple tuple;
> + List *result = NIL;
> +
> + classRel = heap_open(RelationRelationId, AccessShareLock);

> + heap_endscan(scan);
> + heap_close(classRel, AccessShareLock);
> +
> + return result;
> +}

but here.

Btw, why are matviews not publishable?

> +/*
> + * Get Publication using name.
> + */
> +Publication *
> +GetPublicationByName(const char *pubname, bool missing_ok)
> +{
> + Oid oid;
> +
> + oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
> + if (!OidIsValid(oid))
> + {
> + if (missing_ok)
> + return NULL;
> +
> + ereport(ERROR,
> + (errcode(ERRCODE_UNDEFINED_OBJECT),
> + errmsg("publication \"%s\" does not exist", pubname)));
> + }
> +
> + return GetPublication(oid);
> +}

That's racy... Also, shouldn't we specify for how to deal with the
returned memory for Publication * returning methods?

> diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
> new file mode 100644
> index 0000000..954b2bd
> --- /dev/null
> +++ b/src/backend/commands/publicationcmds.c
> @@ -0,0 +1,613 @@

> +/*
> + * Create new publication.
> + */
> +ObjectAddress
> +CreatePublication(CreatePublicationStmt *stmt)
> +{
> + Relation rel;

> +
> + values[Anum_pg_publication_puballtables - 1] =
> + BoolGetDatum(stmt->for_all_tables);
> + values[Anum_pg_publication_pubinsert - 1] =
> + BoolGetDatum(publish_insert);
> + values[Anum_pg_publication_pubupdate - 1] =
> + BoolGetDatum(publish_update);
> + values[Anum_pg_publication_pubdelete - 1] =
> + BoolGetDatum(publish_delete);

I remain convinced that a different representation would be
better. There'll be more options over time (truncate, DDL at least).

> +static void
> +AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
> + HeapTuple tup)
> +{
> + bool publish_insert_given;
> + bool publish_update_given;
> + bool publish_delete_given;
> + bool publish_insert;
> + bool publish_update;
> + bool publish_delete;
> + ObjectAddress obj;
> +
> + parse_publication_options(stmt->options,
> + &publish_insert_given, &publish_insert,
> + &publish_update_given, &publish_update,
> + &publish_delete_given, &publish_delete);

You could pass it a struct instead...

> +static List *
> +OpenTableList(List *tables)
> +{
> + List *relids = NIL;
> + List *rels = NIL;
> + ListCell *lc;
> +
> + /*
> + * Open, share-lock, and check all the explicitly-specified relations
> + */
> + foreach(lc, tables)
> + {
> + RangeVar *rv = lfirst(lc);
> + Relation rel;
> + bool recurse = interpretInhOption(rv->inhOpt);
> + Oid myrelid;
> +
> + rel = heap_openrv(rv, ShareUpdateExclusiveLock);
> + myrelid = RelationGetRelid(rel);
> + /* filter out duplicates when user specifies "foo, foo" */
> + if (list_member_oid(relids, myrelid))
> + {
> + heap_close(rel, ShareUpdateExclusiveLock);
> + continue;
> + }

This is a quadratic algorithm - that could bite us... Not sure if we
need to care. If we want to fix it, one approach owuld be to use
RangeVarGetRelid() instead, and then do a qsort/deduplicate before
actually opening the relations.

>
> -def_elem: ColLabel '=' def_arg
> +def_elem: def_key '=' def_arg
> {
> $$ = makeDefElem($1, (Node *) $3, @1);
> }
> - | ColLabel
> + | def_key
> {
> $$ = makeDefElem($1, NULL, @1);
> }
> ;

> +def_key:
> + ColLabel { $$ = $1; }
> + | ColLabel ColLabel { $$ = psprintf("%s %s", $1, $2); }
> + ;
> +

Not quite sure what this is about? Doesn't that change the accepted
syntax in a bunch of places?

> @@ -2337,6 +2338,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc)
> bms_free(relation->rd_indexattr);
> bms_free(relation->rd_keyattr);
> bms_free(relation->rd_idattr);
> + if (relation->rd_pubactions)
> + pfree(relation->rd_pubactions);
> if (relation->rd_options)
> pfree(relation->rd_options);
> if (relation->rd_indextuple)
> @@ -4992,6 +4995,67 @@ RelationGetExclusionInfo(Relation indexRelation,
> MemoryContextSwitchTo(oldcxt);
> }
>
> +/*
> + * Get publication actions for the given relation.
> + */
> +struct PublicationActions *
> +GetRelationPublicationActions(Relation relation)
> +{
> + List *puboids;
> + ListCell *lc;
> + MemoryContext oldcxt;
> + PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
> +
> + if (relation->rd_pubactions)
> + return memcpy(pubactions, relation->rd_pubactions,
> + sizeof(PublicationActions));
> +
> + /* Fetch the publication membership info. */
> + puboids = GetRelationPublications(RelationGetRelid(relation));
> + puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
> +
> + foreach(lc, puboids)
> + {
> + Oid pubid = lfirst_oid(lc);
> + HeapTuple tup;
> + Form_pg_publication pubform;
> +
> + tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
> +
> + if (!HeapTupleIsValid(tup))
> + elog(ERROR, "cache lookup failed for publication %u", pubid);
> +
> + pubform = (Form_pg_publication) GETSTRUCT(tup);
> +
> + pubactions->pubinsert |= pubform->pubinsert;
> + pubactions->pubupdate |= pubform->pubupdate;
> + pubactions->pubdelete |= pubform->pubdelete;
> +
> + ReleaseSysCache(tup);
> +
> + /*
> + * If we know everything is replicated, there is no point to check
> + * for other publications.
> + */
> + if (pubactions->pubinsert && pubactions->pubupdate &&
> + pubactions->pubdelete)
> + break;
> + }
> +
> + if (relation->rd_pubactions)
> + {
> + pfree(relation->rd_pubactions);
> + relation->rd_pubactions = NULL;
> + }
> +
> + /* Now save copy of the actions in the relcache entry. */
> + oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
> + relation->rd_pubactions = palloc(sizeof(PublicationActions));
> + memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
> + MemoryContextSwitchTo(oldcxt);
> +
> + return pubactions;
> +}

Hm. Do we actually have enough cache invalidation support to make this
cached version correct? I haven't seen anything in that regard? Seems
to mean that all changes to an ALL TABLES publication need to do a
global relcache invalidation?

- Andres

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2016-12-13 01:43:22 Re: Password identifiers, protocol aging and SCRAM protocol
Previous Message Craig Ringer 2016-12-13 01:35:59 Re: Password identifiers, protocol aging and SCRAM protocol