From b2ab430f4ac14c608f9d1bf678863fd01850453b Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Mon, 20 Sep 2021 05:10:42 -0400 Subject: [PATCH v29] support updates based on old and new tuple in row filters When applying row filter on updates, check both old_tuple and new_tuple to decide how an update needs to be transformed. UPDATE old-row (match) new-row (no match) -> DELETE old-row (no match) new row (match) -> INSERT old-row (match) new row (match) -> UPDATE old-row (no match) new-row (no match) -> (drop change) --- src/backend/replication/pgoutput/pgoutput.c | 159 +++++++++++++++++++++++++--- src/include/replication/reorderbuffer.h | 6 +- src/test/subscription/t/025_row_filter.pl | 4 +- 3 files changed, 152 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index ce5e1c5..18c6cbf 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -167,10 +167,14 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, /* row filter routines */ static EState *create_estate_for_relation(Relation rel); +static void pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry); static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, +static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry); +static bool pgoutput_row_filter_update(Relation relation, HeapTuple oldtuple, + HeapTuple newtuple, RelationSyncEntry *entry, + ReorderBufferChangeType *action); /* * Specify output plugin callbacks @@ -734,18 +738,110 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) } /* - * Change is checked against the row filter, if any. - * - * If it returns true, the change is replicated, otherwise, it is not. + * Update is checked against the row filter, if any. + * Updates are transformed to inserts and deletes based on the + * old_tuple and new_tuple. The new action is updated in the + * action parameter. If not updated, action remains as update. + * old-row (match) new-row (no match) -> DELETE + * old-row (no match) new row (match) -> INSERT + * old-row (match) new row (match) -> UPDATE + * old-row (no match) new-row (no match) -> (drop change) + * If it returns true, the change is to be replicated, otherwise, it is not. */ static bool -pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +pgoutput_row_filter_update(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry, ReorderBufferChangeType *action) { - EState *estate; - ExprContext *ecxt; + /* Bail out if there is no row filter */ + if (entry->exprstate_list == NIL) + return true; + + /* update require a new tuple */ + Assert(newtuple); + + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + get_rel_name(relation->rd_id)); + + /* + * If no old_tuple, then none of the replica identity colums changed + * and this would reduce to a simple update. + */ + if (!oldtuple) + return pgoutput_row_filter(relation, NULL, newtuple, entry); + + { + TupleDesc desc = entry->scantuple->tts_tupleDescriptor; + int i; + bool old_matched, new_matched; + Datum *values_old = (Datum *) palloc(desc->natts * sizeof(Datum)); + Datum *values_new = (Datum *) palloc(desc->natts * sizeof(Datum)); + bool *isnull_old = (bool *) palloc(desc->natts * sizeof(bool)); + bool *isnull_new = (bool *) palloc(desc->natts * sizeof(bool)); + HeapTuple tmpoldtuple; + HeapTuple tmpnewtuple; + + /* + * We need to apply the row filter on both the old tuple and the new tuple. + * But the old tuple only has changed columns that are part of the replica identity. + * To complete the set of replica identity columns in the old tuple, copy over the + * columns from the new tuple. Also, unchanged toasted replica identity columns are + * only detoasted in the old tuple, copy this over to the newtuple. + */ + heap_deform_tuple(newtuple, desc, values_new, isnull_new); + heap_deform_tuple(oldtuple, desc, values_old, isnull_old); + + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (isnull_new[i]) + continue; + + if (isnull_old[i]) + { + values_old[i] = values_new[i]; + isnull_old[i] = false; + } + + if ((att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values_new[i])) && + (!isnull_old[i] && !(VARATT_IS_EXTERNAL_ONDISK(values_old[i])))) + values_new[i] = values_old[i]; + } + tmpoldtuple = heap_form_tuple(desc, values_old, isnull_old); + tmpnewtuple = heap_form_tuple(desc, values_new, isnull_new); + + old_matched = pgoutput_row_filter(relation, NULL, tmpoldtuple, entry); + new_matched = pgoutput_row_filter(relation, NULL, tmpnewtuple, entry); + + if (!old_matched && !new_matched) + return false; + + if (old_matched && new_matched) + { + *action = REORDER_BUFFER_CHANGE_UPDATE; + } + else if (old_matched && !new_matched) + { + *action = REORDER_BUFFER_CHANGE_DELETE; + } + else if (new_matched && !old_matched) + { + *action = REORDER_BUFFER_CHANGE_INSERT; + } + + return true; + } +} + +/* + * Initialize the row filter, the first time. + */ + +static void +pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry) +{ + Oid relid = RelationGetRelid(relation); ListCell *lc; - bool result = true; - Oid relid = RelationGetRelid(relation); /* * If the row filter caching is currently flagged "invalid" then it means we @@ -846,6 +942,21 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H entry->rowfilter_valid = true; } +} + +/* + * Change is checked against the row filter, if any. + * + * If it returns true, the change is replicated, otherwise, it is not. + */ +static bool +pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +{ + EState *estate; + ExprContext *ecxt; + bool result = true; + Oid relid = RelationGetRelid(relation); + ListCell *lc; /* Bail out if there is no row filter */ if (entry->exprstate_list == NIL) @@ -941,6 +1052,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); + /* Initialize the row_filter */ + pgoutput_row_filter_init(data, relation, relentry); + /* Send the data */ switch (change->action) { @@ -949,7 +1063,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple tuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, NULL, tuple, relentry)) + if (!pgoutput_row_filter(relation, NULL, tuple, relentry)) break; /* @@ -980,9 +1094,11 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; HeapTuple newtuple = &change->data.tp.newtuple->tuple; + ReorderBufferChangeType modified_action = REORDER_BUFFER_CHANGE_UPDATE; /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, oldtuple, newtuple, relentry)) + if (!pgoutput_row_filter_update(relation, oldtuple, newtuple, relentry, + &modified_action)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1005,6 +1121,25 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); + + switch(modified_action) + { + case REORDER_BUFFER_CHANGE_INSERT: + logicalrep_write_insert(ctx->out, xid, relation, newtuple, + data->binary); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + logicalrep_write_update(ctx->out, xid, relation, oldtuple, + newtuple, data->binary); + break; + case REORDER_BUFFER_CHANGE_DELETE: + logicalrep_write_delete(ctx->out, xid, relation, oldtuple, + data->binary); + break; + default: + Assert(false); + } + logicalrep_write_update(ctx->out, xid, relation, oldtuple, newtuple, data->binary); OutputPluginWrite(ctx, true); @@ -1016,7 +1151,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, oldtuple, NULL, relentry)) + if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5b40ff7..aec0059 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -51,7 +51,7 @@ typedef struct ReorderBufferTupleBuf * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of * logical decoding don't have to care about these. */ -enum ReorderBufferChangeType +typedef enum ReorderBufferChangeType { REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, @@ -65,7 +65,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE -}; +} ReorderBufferChangeType; /* forward declaration */ struct ReorderBufferTXN; @@ -82,7 +82,7 @@ typedef struct ReorderBufferChange XLogRecPtr lsn; /* The type of change. */ - enum ReorderBufferChangeType action; + ReorderBufferChangeType action; /* Transaction this change belongs to. */ struct ReorderBufferTXN *txn; diff --git a/src/test/subscription/t/025_row_filter.pl b/src/test/subscription/t/025_row_filter.pl index dc9becc..742bbbe 100644 --- a/src/test/subscription/t/025_row_filter.pl +++ b/src/test/subscription/t/025_row_filter.pl @@ -220,7 +220,8 @@ $node_publisher->wait_for_catchup($appname); # # - 1001, 1002, 1980 already exist from initial data copy # - INSERT (800, 'test 800') NO, because 800 is not > 1000 -# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered' +# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered', +# but row deleted after the update below. # - INSERT (1601, 'test 1601') YES, because 1601 > 1000 and 'test 1601' <> 'filtered' # - INSERT (1700, 'test 1700') YES, because 1700 > 1000 and 'test 1700' <> 'filtered' # - UPDATE (1600, NULL) NO, row filter evaluates to false because NULL is not <> 'filtered' @@ -232,7 +233,6 @@ $result = "SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2"); is($result, qq(1001|test 1001 1002|test 1002 -1600|test 1600 1601|test 1601 updated 1980|not filtered), 'check replicated rows to table tab_rowfilter_1'); -- 1.8.3.1