From be6758cbf52f4587d2676bdc41e9198aeeacf240 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 11 Nov 2021 04:57:18 -0500 Subject: [PATCH v39 3/6] PS - ExprState cache modifications. Now the cached row-filter caches (e.g. ExprState *) are invalidated only in rel_sync_cache_relation_cb function, so it means the ALTER PUBLICATION for one table should not cause row-filters of other tables to also become invalidated. Also all code related to caching row-filters has been removed from the get_rel_sync_entry function and is now done just before they are needed in the pgoutput_row_filter function. If there are multiple publication filters for a given table these are are all combined into a single filter. Author: Peter Smith, Greg Nancarrow Changes are based on a suggestions from Amit [1] [2], and Houz [3] [1] https://www.postgresql.org/message-id/CAA4eK1%2BxQb06NGs6Y7OzwMtKYYixEqR8tdWV5THAVE4SAqNrDg%40mail.gmail.com [2] https://www.postgresql.org/message-id/CAA4eK1%2Btio46goUKBUfAKFsFVxtgk8nOty%3DTxKoKH-gdLzHD2g%40mail.gmail.com [3] https://www.postgresql.org/message-id/OS0PR01MB5716090A70A73ADF58C58950948D9%40OS0PR01MB5716.jpnprd01.prod.outlook.com --- src/backend/replication/pgoutput/pgoutput.c | 229 +++++++++++++++++++--------- 1 file changed, 154 insertions(+), 75 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 077ae18..f9fdbb0 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1,4 +1,4 @@ -/*------------------------------------------------------------------------- +/*------------------------------------------------------------------------ * * pgoutput.c * Logical Replication output plugin @@ -21,6 +21,7 @@ #include "executor/executor.h" #include "fmgr.h" #include "nodes/nodeFuncs.h" +#include "nodes/makefuncs.h" #include "optimizer/optimizer.h" #include "parser/parse_coerce.h" #include "replication/logical.h" @@ -123,7 +124,15 @@ typedef struct RelationSyncEntry bool replicate_valid; PublicationActions pubactions; - List *exprstate; /* ExprState for row filter */ + + /* + * Row-filter related members: + * The flag 'rowfilter_valid' only means the exprstate * is correct - + * It doesn't mean that there actually is any row filter present for the + * current relid. + */ + bool rowfilter_valid; + ExprState *exprstate; /* ExprState for row filter(s) */ TupleTableSlot *scantuple; /* tuple table slot for row filter */ /* @@ -161,7 +170,7 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, static EState *create_estate_for_relation(Relation rel); static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, +static bool pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry); /* @@ -731,20 +740,134 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) * If it returns true, the change is replicated, otherwise, it is not. */ static bool -pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) { EState *estate; ExprContext *ecxt; ListCell *lc; bool result = true; + Oid relid = RelationGetRelid(relation); + List *rfnodes = NIL; + int n_filters; + + /* + * If the row filter caching is currently flagged "invalid" then it means we + * don't know yet if there is/isn't any row filters for this relation. + * + * This code is usually one-time execution. + */ + if (!entry->rowfilter_valid) + { + bool am_partition = get_rel_relispartition(relid); + MemoryContext oldctx; + TupleDesc tupdesc = RelationGetDescr(relation); + + /* + * Create a tuple table slot for row filter. TupleDesc must live as + * long as the cache remains. Release the tuple table slot if it + * already exists. + */ + if (entry->scantuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->scantuple); + entry->scantuple = NULL; + } + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + tupdesc = CreateTupleDescCopy(tupdesc); + entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + MemoryContextSwitchTo(oldctx); + + /* + * Find if there are any row filters for this relation. If there are, + * then prepare the necessary ExprState and cache it in entry->exprstate. + * + * NOTE: All publication-table mappings must be checked. + * + * NOTE: If the relation is a partition and pubviaroot is true, use + * the row filter of the topmost partitioned table instead of the row + * filter of its own partition. + */ + foreach(lc, data->publications) + { + Publication *pub = lfirst(lc); + HeapTuple rftuple; + Datum rfdatum; + bool rfisnull; + Oid pub_relid = relid; + + if (pub->pubviaroot && am_partition) + { + if (pub->alltables) + pub_relid = llast_oid(get_partition_ancestors(relid)); + else + { + List *ancestors = get_partition_ancestors(relid); + ListCell *lc2; + + /* + * Find the "topmost" ancestor that is in this + * publication. + */ + foreach(lc2, ancestors) + { + Oid ancestor = lfirst_oid(lc2); + + if (list_member_oid(GetRelationPublications(ancestor), + pub->oid)) + { + pub_relid = ancestor; + } + } + } + } + + /* + * Lookup if there is a row-filter, and if yes remember it in a list. + * In code following this 'publications' loop we will combine all filters. + */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(pub_relid), ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(rftuple)) + { + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); + + if (!rfisnull) + { + Node *rfnode; + + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + rfnodes = lappend(rfnodes, rfnode); + } + + ReleaseSysCache(rftuple); + } + + } /* loop all subscribed publications */ + + /* + * Combine all the row-filters (if any) into a single filter, and then build the ExprState for it + */ + n_filters = list_length(rfnodes); + if (n_filters > 0) + { + Node *rfnode; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + rfnode = n_filters > 1 ? makeBoolExpr(AND_EXPR, rfnodes, -1) : linitial(rfnodes); + entry->exprstate = pgoutput_row_filter_init_expr(rfnode); + + list_free(rfnodes); + } + + entry->rowfilter_valid = true; + } /* Bail out if there is no row filter */ - if (entry->exprstate == NIL) + if (!entry->exprstate) return true; elog(DEBUG3, "table \"%s.%s\" has row filter", - get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), - get_rel_name(relation->rd_id)); + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)); PushActiveSnapshot(GetTransactionSnapshot()); @@ -757,20 +880,13 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, R ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); /* - * If the subscription has multiple publications and the same table has a - * different row filter in these publications, all row filters must be - * matched in order to replicate this change. + * NOTE: Multiple publication row-filters have already been combined to a + * single exprstate. */ - foreach(lc, entry->exprstate) + if (entry->exprstate) { - ExprState *exprstate = (ExprState *) lfirst(lc); - /* Evaluates row filter */ - result = pgoutput_row_filter_exec_expr(exprstate, ecxt); - - /* If the tuple does not match one of the row filters, bail out */ - if (!result) - break; + result = pgoutput_row_filter_exec_expr(entry->exprstate, ecxt); } /* Cleanup allocated resources */ @@ -840,7 +956,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple tuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, NULL, tuple, relentry)) + if (!pgoutput_row_filter(data, relation, NULL, tuple, relentry)) break; /* @@ -873,7 +989,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple newtuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry)) + if (!pgoutput_row_filter(data, relation, oldtuple, newtuple, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -907,7 +1023,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry)) + if (!pgoutput_row_filter(data, relation, oldtuple, NULL, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1321,10 +1437,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->schema_sent = false; entry->streamed_txns = NIL; entry->replicate_valid = false; + entry->rowfilter_valid = false; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->scantuple = NULL; - entry->exprstate = NIL; + entry->exprstate = NULL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ @@ -1344,7 +1461,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; - TupleDesc tupdesc = RelationGetDescr(relation); /* Reload publications if needed before use. */ if (!publications_valid) @@ -1358,22 +1474,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) publications_valid = true; } - /* Release tuple table slot */ - if (entry->scantuple != NULL) - { - ExecDropSingleTupleTableSlot(entry->scantuple); - entry->scantuple = NULL; - } - - /* - * Create a tuple table slot for row filter. TupleDesc must live as - * long as the cache remains. - */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - tupdesc = CreateTupleDescCopy(tupdesc); - entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); - MemoryContextSwitchTo(oldctx); - /* * Build publication cache. We can't use one provided by relcache as * relcache considers all publications given relation is in, but here @@ -1383,9 +1483,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { Publication *pub = lfirst(lc); bool publish = false; - HeapTuple rftuple; - Datum rfdatum; - bool rfisnull; if (pub->alltables) { @@ -1449,33 +1546,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - /* - * Cache row filter, if available. All publication-table mappings - * must be checked. If it is a partition and pubviaroot is true, - * use the row filter of the topmost partitioned table instead of - * the row filter of its own partition. - */ - rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), ObjectIdGetDatum(pub->oid)); - if (HeapTupleIsValid(rftuple)) - { - rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); - - if (!rfisnull) - { - Node *rfnode; - ExprState *exprstate; - - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - rfnode = stringToNode(TextDatumGetCString(rfdatum)); - - /* Prepare for expression execution */ - exprstate = pgoutput_row_filter_init_expr(rfnode); - entry->exprstate = lappend(entry->exprstate, exprstate); - MemoryContextSwitchTo(oldctx); - } - - ReleaseSysCache(rftuple); - } } list_free(pubids); @@ -1582,6 +1652,21 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) free_conversion_map(entry->map); } entry->map = NULL; + + /* + * Row filter cache cleanups. (Will be rebuilt later if needed). + */ + entry->rowfilter_valid = false; + if (entry->scantuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->scantuple); + entry->scantuple = NULL; + } + if (entry->exprstate != NULL) + { + free(entry->exprstate); + entry->exprstate = NULL; + } } } @@ -1622,12 +1707,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; - - if (entry->exprstate != NIL) - { - list_free_deep(entry->exprstate); - entry->exprstate = NIL; - } } MemoryContextSwitchTo(oldctx); -- 1.8.3.1