Re: row filtering for logical replication

From: Andres Freund <andres(at)anarazel(dot)de>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, Euler Taveira <euler(at)eulerto(dot)com>, Greg Nancarrow <gregn4422(at)gmail(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Ajin Cherian <itsajin(at)gmail(dot)com>, "tanghy(dot)fnst(at)fujitsu(dot)com" <tanghy(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Rahila Syed <rahilasyed90(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Önder Kalacı <onderkalaci(at)gmail(dot)com>, japin <japinli(at)hotmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, David Steele <david(at)pgmasters(dot)net>, Craig Ringer <craig(at)2ndquadrant(dot)com>, Amit Langote <amitlangote09(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: row filtering for logical replication
Date: 2022-01-29 00:31:10
Message-ID: 20220129003110.6ndrrpanem5sb4ee@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

Are there any recent performance evaluations of the overhead of row filters? I
think it'd be good to get some numbers comparing:

1) $workload with master
2) $workload with patch, but no row filters
3) $workload with patch, row filter matching everything
4) $workload with patch, row filter matching few rows

For workload I think it'd be worth testing:
a) bulk COPY/INSERT into one table
b) Many transactions doing small modifications to one table
c) Many transactions targetting many different tables
d) Interspersed DDL + small changes to a table

> +/*
> + * Initialize for row filter expression execution.
> + */
> +static ExprState *
> +pgoutput_row_filter_init_expr(Node *rfnode)
> +{
> + ExprState *exprstate;
> + Expr *expr;
> +
> + /*
> + * This is the same code as ExecPrepareExpr() but that is not used because
> + * we want to cache the expression. There should probably be another
> + * function in the executor to handle the execution outside a normal Plan
> + * tree context.
> + */
> + expr = expression_planner((Expr *) rfnode);
> + exprstate = ExecInitExpr(expr, NULL);
> +
> + return exprstate;
> +}

In what memory context does this run? Are we taking care to deal with leaks?
I'm pretty sure the planner relies on cleanup via memory contexts.

> + memset(entry->exprstate, 0, sizeof(entry->exprstate));
> +
> + schemaId = get_rel_namespace(entry->publish_as_relid);
> + schemaPubids = GetSchemaPublications(schemaId);

Isn't this stuff that we've already queried before? If we re-fetch a lot of
information it's not clear to me that it's actually a good idea to defer
building the row filter.

> + am_partition = get_rel_relispartition(entry->publish_as_relid);

All this stuff likely can cause some memory "leakage" if you run it in a
long-lived memory context.

> + /*
> + * Find if there are any row filters for this relation. If there are,
> + * then prepare the necessary ExprState and cache it in
> + * entry->exprstate. To build an expression state, we need to ensure
> + * the following:
> + *
> + * All publication-table mappings must be checked.
> + *
> + * If the relation is a partition and pubviaroot is true, use the row
> + * filter of the topmost partitioned table instead of the row filter of
> + * its own partition.
> + *
> + * Multiple publications might have multiple row filters for this
> + * relation. Since row filter usage depends on the DML operation, there
> + * are multiple lists (one for each operation) to which row filters
> + * will be appended.
> + *
> + * FOR ALL TABLES implies "don't use row filter expression" so it takes
> + * precedence.
> + *
> + * ALL TABLES IN SCHEMA implies "don't use row filter expression" if
> + * the schema is the same as the table schema.
> + */
> + foreach(lc, data->publications)
> + {
> + Publication *pub = lfirst(lc);
> + HeapTuple rftuple = NULL;
> + Datum rfdatum = 0;
> + bool pub_no_filter = false;
> +
> + if (pub->alltables)
> + {
> + /*
> + * If the publication is FOR ALL TABLES then it is treated the
> + * same as if this table has no row filters (even if for other
> + * publications it does).
> + */
> + pub_no_filter = true;
> + }
> + else if (list_member_oid(schemaPubids, pub->oid))
> + {
> + /*
> + * If the publication is FOR ALL TABLES IN SCHEMA and it overlaps
> + * with the current relation in the same schema then this is also
> + * treated same as if this table has no row filters (even if for
> + * other publications it does).
> + */
> + pub_no_filter = true;

Isn't this basically O(schemas * publications)?

> + if (has_filter)
> + {
> + /* Create or reset the memory context for row filters */
> + if (entry->cache_expr_cxt == NULL)
> + entry->cache_expr_cxt = AllocSetContextCreate(CacheMemoryContext,
> + "Row filter expressions",
> + ALLOCSET_DEFAULT_SIZES);
> + else
> + MemoryContextReset(entry->cache_expr_cxt);

I see this started before this patch, but I don't think it's a great idea that
pgoutput does a bunch of stuff in CacheMemoryContext. That makes it
unnecessarily hard to debug leaks.

Seems like all this should live somwhere below ctx->context, allocated in
pgoutput_startup()?

Consider what happens in a long-lived replication connection, where
occasionally there's a transient error causing streaming to stop. At that
point you'll just loose all knowledge of entry->cache_expr_cxt, no?

> +
> +/* Inialitize the slot for storing new and old tuple */
> +static void
> +init_tuple_slot(Relation relation, RelationSyncEntry *entry)
> +{
> + MemoryContext oldctx;
> + TupleDesc oldtupdesc;
> + TupleDesc newtupdesc;
> +
> + oldctx = MemoryContextSwitchTo(CacheMemoryContext);
> +
> + /*
> + * Create tuple table slots. Create a copy of the TupleDesc as it needs to
> + * live as long as the cache remains.
> + */
> + oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
> + newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
> +
> + entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
> + entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
> +
> + MemoryContextSwitchTo(oldctx);
> +}

This *definitely* shouldn't be allocated in CacheMemoryContext. It's one thing
to have a named context below CacheMemoryContext, that's still somewhat
identifiable. But allocating directly in CacheMemoryContext is almost always a
bad idea.

What is supposed to clean any of this up in case of error?

I guess I'll start a separate thread about memory handling in pgoutput :/

> + /*
> + * We need this map to avoid relying on ReorderBufferChangeType enums
> + * having specific values.
> + */
> + static int map_changetype_pubaction[] = {
> + [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
> + [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
> + [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
> + };

Why is this "static"? Function-local statics only really make sense for
variables that are changed and should survive between calls to a function.

> + Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
> + *action == REORDER_BUFFER_CHANGE_UPDATE ||
> + *action == REORDER_BUFFER_CHANGE_DELETE);
> +
> + Assert(new_slot || old_slot);
> +
> + /* Get the corresponding row filter */
> + filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
> +
> + /* Bail out if there is no row filter */
> + if (!filter_exprstate)
> + return true;
> +
> + elog(DEBUG3, "table \"%s.%s\" has row filter",
> + get_namespace_name(RelationGetNamespace(relation)),
> + RelationGetRelationName(relation));
> +
> + estate = create_estate_for_relation(relation);
> + ecxt = GetPerTupleExprContext(estate);

So we do this for each filtered row? That's a *lot* of
overhead. CreateExecutorState() creates its own memory context, allocates an
EState, then GetPerTupleExprContext() allocates an ExprContext, which then
creates another memory context.

I don't really see any need to allocate this over-and-over?

> case REORDER_BUFFER_CHANGE_INSERT:
> {
> - HeapTuple tuple = &change->data.tp.newtuple->tuple;
> + /*
> + * Schema should be sent before the logic that replaces the
> + * relation because it also sends the ancestor's relation.
> + */
> + maybe_send_schema(ctx, change, relation, relentry);
> +
> + new_slot = relentry->new_slot;
> +
> + ExecClearTuple(new_slot);
> + ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
> + new_slot, false);

Why? This isn't free, and you're doing it unconditionally. I'd bet this alone
is noticeable slowdown over the current state.

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2022-01-29 00:36:32 Re: Add last commit LSN to pg_last_committed_xact()
Previous Message Justin Pryzby 2022-01-29 00:09:12 Re: warn if GUC set to an invalid shared library