Re: row filtering for logical replication - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | Re: row filtering for logical replication |
Date | |
Msg-id | 20220129003110.6ndrrpanem5sb4ee@alap3.anarazel.de Whole thread Raw |
In response to | RE: row filtering for logical replication ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>) |
Responses |
Re: row filtering for logical replication
RE: row filtering for logical replication Re: row filtering for logical replication Re: row filtering for logical replication Re: row filtering for logical replication RE: row filtering for logical replication |
List | pgsql-hackers |
Hi, Are there any recent performance evaluations of the overhead of row filters? I think it'd be good to get some numbers comparing: 1) $workload with master 2) $workload with patch, but no row filters 3) $workload with patch, row filter matching everything 4) $workload with patch, row filter matching few rows For workload I think it'd be worth testing: a) bulk COPY/INSERT into one table b) Many transactions doing small modifications to one table c) Many transactions targetting many different tables d) Interspersed DDL + small changes to a table > +/* > + * Initialize for row filter expression execution. > + */ > +static ExprState * > +pgoutput_row_filter_init_expr(Node *rfnode) > +{ > + ExprState *exprstate; > + Expr *expr; > + > + /* > + * This is the same code as ExecPrepareExpr() but that is not used because > + * we want to cache the expression. There should probably be another > + * function in the executor to handle the execution outside a normal Plan > + * tree context. > + */ > + expr = expression_planner((Expr *) rfnode); > + exprstate = ExecInitExpr(expr, NULL); > + > + return exprstate; > +} In what memory context does this run? Are we taking care to deal with leaks? I'm pretty sure the planner relies on cleanup via memory contexts. > + memset(entry->exprstate, 0, sizeof(entry->exprstate)); > + > + schemaId = get_rel_namespace(entry->publish_as_relid); > + schemaPubids = GetSchemaPublications(schemaId); Isn't this stuff that we've already queried before? If we re-fetch a lot of information it's not clear to me that it's actually a good idea to defer building the row filter. > + am_partition = get_rel_relispartition(entry->publish_as_relid); All this stuff likely can cause some memory "leakage" if you run it in a long-lived memory context. > + /* > + * Find if there are any row filters for this relation. If there are, > + * then prepare the necessary ExprState and cache it in > + * entry->exprstate. To build an expression state, we need to ensure > + * the following: > + * > + * All publication-table mappings must be checked. > + * > + * If the relation is a partition and pubviaroot is true, use the row > + * filter of the topmost partitioned table instead of the row filter of > + * its own partition. > + * > + * Multiple publications might have multiple row filters for this > + * relation. Since row filter usage depends on the DML operation, there > + * are multiple lists (one for each operation) to which row filters > + * will be appended. > + * > + * FOR ALL TABLES implies "don't use row filter expression" so it takes > + * precedence. > + * > + * ALL TABLES IN SCHEMA implies "don't use row filter expression" if > + * the schema is the same as the table schema. > + */ > + foreach(lc, data->publications) > + { > + Publication *pub = lfirst(lc); > + HeapTuple rftuple = NULL; > + Datum rfdatum = 0; > + bool pub_no_filter = false; > + > + if (pub->alltables) > + { > + /* > + * If the publication is FOR ALL TABLES then it is treated the > + * same as if this table has no row filters (even if for other > + * publications it does). > + */ > + pub_no_filter = true; > + } > + else if (list_member_oid(schemaPubids, pub->oid)) > + { > + /* > + * If the publication is FOR ALL TABLES IN SCHEMA and it overlaps > + * with the current relation in the same schema then this is also > + * treated same as if this table has no row filters (even if for > + * other publications it does). > + */ > + pub_no_filter = true; Isn't this basically O(schemas * publications)? > + if (has_filter) > + { > + /* Create or reset the memory context for row filters */ > + if (entry->cache_expr_cxt == NULL) > + entry->cache_expr_cxt = AllocSetContextCreate(CacheMemoryContext, > + "Row filter expressions", > + ALLOCSET_DEFAULT_SIZES); > + else > + MemoryContextReset(entry->cache_expr_cxt); I see this started before this patch, but I don't think it's a great idea that pgoutput does a bunch of stuff in CacheMemoryContext. That makes it unnecessarily hard to debug leaks. Seems like all this should live somwhere below ctx->context, allocated in pgoutput_startup()? Consider what happens in a long-lived replication connection, where occasionally there's a transient error causing streaming to stop. At that point you'll just loose all knowledge of entry->cache_expr_cxt, no? > + > +/* Inialitize the slot for storing new and old tuple */ > +static void > +init_tuple_slot(Relation relation, RelationSyncEntry *entry) > +{ > + MemoryContext oldctx; > + TupleDesc oldtupdesc; > + TupleDesc newtupdesc; > + > + oldctx = MemoryContextSwitchTo(CacheMemoryContext); > + > + /* > + * Create tuple table slots. Create a copy of the TupleDesc as it needs to > + * live as long as the cache remains. > + */ > + oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation)); > + newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation)); > + > + entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple); > + entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple); > + > + MemoryContextSwitchTo(oldctx); > +} This *definitely* shouldn't be allocated in CacheMemoryContext. It's one thing to have a named context below CacheMemoryContext, that's still somewhat identifiable. But allocating directly in CacheMemoryContext is almost always a bad idea. What is supposed to clean any of this up in case of error? I guess I'll start a separate thread about memory handling in pgoutput :/ > + /* > + * We need this map to avoid relying on ReorderBufferChangeType enums > + * having specific values. > + */ > + static int map_changetype_pubaction[] = { > + [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT, > + [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE, > + [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE > + }; Why is this "static"? Function-local statics only really make sense for variables that are changed and should survive between calls to a function. > + Assert(*action == REORDER_BUFFER_CHANGE_INSERT || > + *action == REORDER_BUFFER_CHANGE_UPDATE || > + *action == REORDER_BUFFER_CHANGE_DELETE); > + > + Assert(new_slot || old_slot); > + > + /* Get the corresponding row filter */ > + filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]]; > + > + /* Bail out if there is no row filter */ > + if (!filter_exprstate) > + return true; > + > + elog(DEBUG3, "table \"%s.%s\" has row filter", > + get_namespace_name(RelationGetNamespace(relation)), > + RelationGetRelationName(relation)); > + > + estate = create_estate_for_relation(relation); > + ecxt = GetPerTupleExprContext(estate); So we do this for each filtered row? That's a *lot* of overhead. CreateExecutorState() creates its own memory context, allocates an EState, then GetPerTupleExprContext() allocates an ExprContext, which then creates another memory context. I don't really see any need to allocate this over-and-over? > case REORDER_BUFFER_CHANGE_INSERT: > { > - HeapTuple tuple = &change->data.tp.newtuple->tuple; > + /* > + * Schema should be sent before the logic that replaces the > + * relation because it also sends the ancestor's relation. > + */ > + maybe_send_schema(ctx, change, relation, relentry); > + > + new_slot = relentry->new_slot; > + > + ExecClearTuple(new_slot); > + ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, > + new_slot, false); Why? This isn't free, and you're doing it unconditionally. I'd bet this alone is noticeable slowdown over the current state. Greetings, Andres Freund
pgsql-hackers by date: