From 72e0430375c827ab19fea58c3385b788578c7d31 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Thu, 18 Nov 2021 11:36:22 +1100 Subject: [PATCH v40] Support updates based on old and new tuple in row filters. When applying row filter on updates, check both old_tuple and new_tuple to decide how an update needs to be transformed. UPDATE old-row (match) new-row (no match) -> DELETE old-row (no match) new row (match) -> INSERT old-row (match) new row (match) -> UPDATE old-row (no match) new-row (no match) -> (drop change) Also tuples that have been deformed will be cached in slots to avoid multiple deforming of tuples. Author: Ajin Cherian --- src/backend/catalog/pg_publication.c | 12 +- src/backend/replication/logical/proto.c | 35 ++-- src/backend/replication/pgoutput/pgoutput.c | 243 ++++++++++++++++++++++++++-- src/include/replication/logicalproto.h | 7 +- src/include/replication/reorderbuffer.h | 6 +- src/test/subscription/t/025_row_filter.pl | 4 +- src/tools/pgindent/typedefs.list | 1 + 7 files changed, 267 insertions(+), 41 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 7174a56..09f0981 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -261,9 +261,9 @@ rowfilter_expr_replident_walker(Node *node, rf_context *context) * Rule 1. Walk the parse-tree and reject anything other than very simple * expressions. (See rowfilter_validator for details what is permitted). * - * Rule 2. If the publish operation contains "delete" then only columns that - * are allowed by the REPLICA IDENTITY rules are permitted to be used in the - * row-filter WHERE clause. + * Rule 2. If the publish operation contains "delete" or "update" then only + * columns that are allowed by the REPLICA IDENTITY rules are permitted to + * be used in the row-filter WHERE clause. */ static void rowfilter_expr_checker(Publication *pub, Relation rel, Node *rfnode) @@ -276,12 +276,10 @@ rowfilter_expr_checker(Publication *pub, Relation rel, Node *rfnode) rowfilter_validator(relname, rfnode); /* - * Rule 2: For "delete", check that filter cols are also valid replica + * Rule 2: For "delete" or "update", check that filter cols are also valid replica * identity cols. - * - * TODO - check later for publish "update" case. */ - if (pub->pubactions.pubdelete) + if (pub->pubactions.pubdelete || pub->pubactions.pubupdate) { char replica_identity = rel->rd_rel->relreplident; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b..6b55a94 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -19,6 +19,7 @@ #include "replication/logicalproto.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "executor/executor.h" /* * Protocol message flags. @@ -31,8 +32,8 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); - + HeapTuple tuple, TupleTableSlot *slot, + bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -410,7 +411,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, NULL, binary); } /* @@ -442,7 +443,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + HeapTuple oldtuple, HeapTuple newtuple, TupleTableSlot *oldslot, + TupleTableSlot *newslot, bool binary) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -463,11 +465,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, oldslot, binary); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, newslot, binary); } /* @@ -536,7 +538,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, NULL, binary); } /* @@ -749,11 +751,12 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, TupleTableSlot *slot, +bool binary) { TupleDesc desc; - Datum values[MaxTupleAttributeNumber]; - bool isnull[MaxTupleAttributeNumber]; + Datum *values; + bool *isnull; int i; uint16 nliveatts = 0; @@ -771,7 +774,17 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar enlargeStringInfo(out, tuple->t_len + nliveatts * (1 + 4)); - heap_deform_tuple(tuple, desc, values, isnull); + if (slot == NULL || TTS_EMPTY(slot)) + { + values = (Datum *) palloc(desc->natts * sizeof(Datum)); + isnull = (bool *) palloc(desc->natts * sizeof(bool)); + heap_deform_tuple(tuple, desc, values, isnull); + } + else + { + values = slot->tts_values; + isnull = slot->tts_isnull; + } /* Write the values */ for (i = 0; i < desc->natts; i++) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index aa7bdc2..a0d1455 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -134,7 +134,10 @@ typedef struct RelationSyncEntry */ bool rowfilter_valid; ExprState *exprstate; /* ExprState for row filter(s) */ - TupleTableSlot *scantuple; /* tuple table slot for row filter */ + TupleTableSlot *scantuple; /* tuple table slot for row filter */ + TupleTableSlot *new_tuple; /* slot for storing deformed new tuple during updates */ + TupleTableSlot *old_tuple; /* slot for storing deformed old tuple during updates */ + TupleTableSlot *tmp_new_tuple; /* slot for temporary new tuple used for expression evaluation */ /* * OID of the relation to publish changes as. For a partition, this may @@ -169,10 +172,16 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, /* row filter routines */ static EState *create_estate_for_relation(Relation rel); +static void pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry); static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, +static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry); +static bool pgoutput_row_filter_virtual(Relation relation, TupleTableSlot *slot, + RelationSyncEntry *entry); +static bool pgoutput_row_filter_update_check(Relation relation, HeapTuple oldtuple, + HeapTuple newtuple, RelationSyncEntry *entry, + ReorderBufferChangeType *action); /* * Specify output plugin callbacks @@ -736,17 +745,112 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) } /* - * Change is checked against the row filter, if any. - * - * If it returns true, the change is replicated, otherwise, it is not. + * Update is checked against the row filter, if any. + * Updates are transformed to inserts and deletes based on the + * old_tuple and new_tuple. The new action is updated in the + * action parameter. If not updated, action remains as update. + * old-row (no match) new-row (no match) -> (drop change) + * old-row (no match) new row (match) -> INSERT + * old-row (match) new-row (no match) -> DELETE + * old-row (match) new row (match) -> UPDATE + * If the change is to be replicated returns true, else false. */ static bool -pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +pgoutput_row_filter_update_check(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry, ReorderBufferChangeType *action) +{ + TupleDesc desc = entry->scantuple->tts_tupleDescriptor; + int i; + bool old_matched, new_matched; + TupleTableSlot *tmp_new_slot, *old_slot, *new_slot; + + /* Bail out if there is no row filter */ + if (!entry->exprstate) + return true; + + /* update requires a new tuple */ + Assert(newtuple); + + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + get_rel_name(relation->rd_id)); + + /* + * If no old_tuple, then none of the replica identity columns changed + * and this would reduce to a simple update. + */ + if (!oldtuple) + { + *action = REORDER_BUFFER_CHANGE_UPDATE; + return pgoutput_row_filter(relation, NULL, newtuple, entry); + } + + old_slot = entry->old_tuple; + new_slot = entry->new_tuple; + tmp_new_slot = entry->tmp_new_tuple; + ExecClearTuple(old_slot); + ExecClearTuple(new_slot); + ExecClearTuple(tmp_new_slot); + + heap_deform_tuple(newtuple, desc, new_slot->tts_values, new_slot->tts_isnull); + heap_deform_tuple(oldtuple, desc, old_slot->tts_values, old_slot->tts_isnull); + + ExecStoreVirtualTuple(old_slot); + ExecStoreVirtualTuple(new_slot); + + tmp_new_slot = ExecCopySlot(tmp_new_slot, new_slot); + + /* + * For updates, both the newtuple and oldtuple needs to be checked + * against the row-filter. The newtuple might not have all the + * replica identity columns, in which case it needs to be + * copied over from the oldtuple. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + /* if the column in the new_tuple is null, nothing to do */ + if (tmp_new_slot->tts_isnull[i]) + continue; + + /* + * Unchanged toasted replica identity columns are + * only detoasted in the old tuple, copy this over to the newtuple. + */ + if ((att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(tmp_new_slot->tts_values[i])) && + (!old_slot->tts_isnull[i] && + !(VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i])))) + { + Assert(!old_slot->tts_isnull[i] && + !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i])); + tmp_new_slot->tts_values[i] = old_slot->tts_values[i]; + } + + } + + old_matched = pgoutput_row_filter_virtual(relation, old_slot, entry); + new_matched = pgoutput_row_filter_virtual(relation, tmp_new_slot, entry); + + if (!old_matched && !new_matched) + return false; + + if (!old_matched && new_matched) + *action = REORDER_BUFFER_CHANGE_INSERT; + else if (old_matched && !new_matched) + *action = REORDER_BUFFER_CHANGE_DELETE; + else if (new_matched && old_matched) + *action = REORDER_BUFFER_CHANGE_UPDATE; + + return true; +} + +/* + * Initialize the row filter, the first time. + */ +static void +pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry) { - EState *estate; - ExprContext *ecxt; ListCell *lc; - bool result = true; Oid relid = RelationGetRelid(relation); List *rfnodes = NIL; int n_filters; @@ -774,7 +878,7 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H TupleDesc tupdesc = RelationGetDescr(relation); /* - * Create a tuple table slot for row filter. TupleDesc must live as + * Create tuple table slots for row filter. TupleDesc must live as * long as the cache remains. Release the tuple table slot if it * already exists. */ @@ -783,9 +887,28 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H ExecDropSingleTupleTableSlot(entry->scantuple); entry->scantuple = NULL; } + if (entry->old_tuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->old_tuple); + entry->old_tuple = NULL; + } + if (entry->new_tuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->new_tuple); + entry->new_tuple = NULL; + } + if (entry->tmp_new_tuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->tmp_new_tuple); + entry->tmp_new_tuple = NULL; + } oldctx = MemoryContextSwitchTo(CacheMemoryContext); tupdesc = CreateTupleDescCopy(tupdesc); entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + entry->old_tuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + entry->new_tuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + entry->tmp_new_tuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + MemoryContextSwitchTo(oldctx); /* @@ -866,6 +989,67 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H entry->rowfilter_valid = true; } +} + +/* + * Change is checked against the row filter, if any. + * + * The row is passed in as a virtual slot. + * + */ +static bool +pgoutput_row_filter_virtual(Relation relation, TupleTableSlot *slot, RelationSyncEntry *entry) +{ + EState *estate; + ExprContext *ecxt; + bool result = true; + + /* Bail out if there is no row filter */ + if (!entry->exprstate) + return true; + + if (message_level_is_interesting(DEBUG3)) + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(get_rel_namespace(entry->relid)), + get_rel_name(entry->relid)); + + PushActiveSnapshot(GetTransactionSnapshot()); + + estate = create_estate_for_relation(relation); + + /* Prepare context per tuple */ + ecxt = GetPerTupleExprContext(estate); + ecxt->ecxt_scantuple = slot; + + /* + * NOTE: Multiple publication row-filters have already been combined to a + * single exprstate. + */ + if (entry->exprstate) + { + /* Evaluates row filter */ + result = pgoutput_row_filter_exec_expr(entry->exprstate, ecxt); + } + + /* Cleanup allocated resources */ + FreeExecutorState(estate); + PopActiveSnapshot(); + + return result; +} + +/* + * Change is checked against the row filter, if any. + * + * If it returns true, the change is replicated, otherwise, it is not. + */ +static bool +pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +{ + EState *estate; + ExprContext *ecxt; + bool result = true; + Oid relid = RelationGetRelid(relation); /* Bail out if there is no row filter */ if (!entry->exprstate) @@ -896,7 +1080,6 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H } /* Cleanup allocated resources */ - ResetExprContext(ecxt); FreeExecutorState(estate); PopActiveSnapshot(); @@ -954,6 +1137,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); + /* Initialize the row_filter */ + pgoutput_row_filter_init(data, relation, relentry); + /* Send the data */ switch (change->action) { @@ -962,7 +1148,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple tuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, NULL, tuple, relentry)) + if (!pgoutput_row_filter(relation, NULL, tuple, relentry)) break; /* @@ -993,9 +1179,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; HeapTuple newtuple = &change->data.tp.newtuple->tuple; + ReorderBufferChangeType modified_action = REORDER_BUFFER_CHANGE_UPDATE; - /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, oldtuple, newtuple, relentry)) + if (!pgoutput_row_filter_update_check(relation, oldtuple, newtuple, relentry, + &modified_action)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1018,8 +1205,27 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + + switch (modified_action) + { + case REORDER_BUFFER_CHANGE_INSERT: + logicalrep_write_insert(ctx->out, xid, relation, newtuple, + data->binary); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + logicalrep_write_update(ctx->out, xid, relation, + oldtuple, newtuple, relentry->old_tuple, + relentry->new_tuple, + data->binary); + break; + case REORDER_BUFFER_CHANGE_DELETE: + logicalrep_write_delete(ctx->out, xid, relation, oldtuple, + data->binary); + break; + default: + Assert(false); + } + OutputPluginWrite(ctx, true); break; } @@ -1029,7 +1235,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, oldtuple, NULL, relentry)) + if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1447,6 +1653,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->scantuple = NULL; + entry->new_tuple = NULL; + entry->old_tuple = NULL; + entry->tmp_new_tuple = NULL; entry->exprstate = NULL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 83741dc..427c40a 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -16,6 +16,7 @@ #include "access/xact.h" #include "replication/reorderbuffer.h" #include "utils/rel.h" +#include "executor/executor.h" /* * Protocol capabilities @@ -211,7 +212,11 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid, extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + HeapTuple newtuple, TupleTableSlot *oldslot, + TupleTableSlot *newslot, bool binary); +extern void logicalrep_write_update_cached(StringInfo out, TransactionId xid, Relation rel, + TupleTableSlot *oldtuple, TupleTableSlot *newtuple, + bool binary); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5b40ff7..aec0059 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -51,7 +51,7 @@ typedef struct ReorderBufferTupleBuf * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of * logical decoding don't have to care about these. */ -enum ReorderBufferChangeType +typedef enum ReorderBufferChangeType { REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, @@ -65,7 +65,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE -}; +} ReorderBufferChangeType; /* forward declaration */ struct ReorderBufferTXN; @@ -82,7 +82,7 @@ typedef struct ReorderBufferChange XLogRecPtr lsn; /* The type of change. */ - enum ReorderBufferChangeType action; + ReorderBufferChangeType action; /* Transaction this change belongs to. */ struct ReorderBufferTXN *txn; diff --git a/src/test/subscription/t/025_row_filter.pl b/src/test/subscription/t/025_row_filter.pl index 2703470..96bfe2b 100644 --- a/src/test/subscription/t/025_row_filter.pl +++ b/src/test/subscription/t/025_row_filter.pl @@ -264,7 +264,8 @@ is($result, qq(13|0|12), 'check replicated rows to tab_rowfilter_4'); # # - 1001, 1002, 1980 already exist from initial data copy # - INSERT (800, 'test 800') NO, because 800 is not > 1000 -# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered' +# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered', +# but row deleted after the update below. # - INSERT (1601, 'test 1601') YES, because 1601 > 1000 and 'test 1601' <> 'filtered' # - INSERT (1700, 'test 1700') YES, because 1700 > 1000 and 'test 1700' <> 'filtered' # - UPDATE (1600, NULL) NO, row filter evaluates to false because NULL is not <> 'filtered' @@ -276,7 +277,6 @@ $result = "SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2"); is($result, qq(1001|test 1001 1002|test 1002 -1600|test 1600 1601|test 1601 updated 1980|not filtered), 'check replicated rows to table tab_rowfilter_1'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index da6ac8e..2f41eac 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2194,6 +2194,7 @@ ReorderBufferApplyChangeCB ReorderBufferApplyTruncateCB ReorderBufferBeginCB ReorderBufferChange +ReorderBufferChangeType ReorderBufferCommitCB ReorderBufferCommitPreparedCB ReorderBufferDiskChange -- 1.8.3.1