From ce10151674e3c53fdf845d5db0a91c582b4d7e70 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 19 Oct 2021 22:33:26 -0400 Subject: [PATCH v33] Support updates based on old and new tuple in row filters. When applying row filter on updates, check both old_tuple and new_tuple to decide how an update needs to be transformed. UPDATE old-row (match) new-row (no match) -> DELETE old-row (no match) new row (match) -> INSERT old-row (match) new row (match) -> UPDATE old-row (no match) new-row (no match) -> (drop change) Also tuples that have been deformed will be cached in slots to avoid multiple deforming of tuples. --- src/backend/replication/logical/proto.c | 122 +++++++++++++ src/backend/replication/pgoutput/pgoutput.c | 264 +++++++++++++++++++++++++--- src/include/replication/logicalproto.h | 4 + src/include/replication/reorderbuffer.h | 6 +- src/test/subscription/t/025_row_filter.pl | 4 +- 5 files changed, 375 insertions(+), 25 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b..6b14340 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -19,6 +19,7 @@ #include "replication/logicalproto.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "executor/executor.h" /* * Protocol message flags. @@ -32,6 +33,8 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary); +static void logicalrep_write_tuple_cached(StringInfo out, Relation rel, + TupleTableSlot *slot, bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -438,6 +441,38 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) } /* + * Write UPDATE to the output stream using cached virtual slots. + * Cached updates will have both old tuple and new tuple. + */ +void +logicalrep_write_update_cached(StringInfo out, TransactionId xid, Relation rel, + TupleTableSlot *oldtuple, TupleTableSlot *newtuple, bool binary) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); + + Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || + rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || + rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + /* use Oid as relation identifier */ + pq_sendint32(out, RelationGetRelid(rel)); + + if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + pq_sendbyte(out, 'O'); /* old tuple follows */ + else + pq_sendbyte(out, 'K'); /* old key follows */ + logicalrep_write_tuple_cached(out, rel, oldtuple, binary); + + pq_sendbyte(out, 'N'); /* new tuple follows */ + logicalrep_write_tuple_cached(out, rel, newtuple, binary); +} + + +/* * Write UPDATE to the output stream. */ void @@ -746,6 +781,93 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) } /* + * Write a tuple to the outputstream using cached slot, in the most efficient format possible. + */ +static void +logicalrep_write_tuple_cached(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary) +{ + TupleDesc desc; + int i; + uint16 nliveatts = 0; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, false, NULL); + + desc = RelationGetDescr(rel); + + for (i = 0; i < desc->natts; i++) + { + if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + continue; + nliveatts++; + } + pq_sendint16(out, nliveatts); + + /* try to allocate enough memory from the get-go */ + enlargeStringInfo(out, tuple->t_len + + nliveatts * (1 + 4)); + + /* Write the values */ + for (i = 0; i < desc->natts; i++) + { + HeapTuple typtup; + Form_pg_type typclass; + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + + if (slot->tts_isnull[i]) + { + pq_sendbyte(out, LOGICALREP_COLUMN_NULL); + continue; + } + + if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(slot->tts_values[i])) + { + /* + * Unchanged toasted datum. (Note that we don't promise to detect + * unchanged data in general; this is just a cheap check to avoid + * sending large values unnecessarily.) + */ + pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED); + continue; + } + + typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid)); + if (!HeapTupleIsValid(typtup)) + elog(ERROR, "cache lookup failed for type %u", att->atttypid); + typclass = (Form_pg_type) GETSTRUCT(typtup); + + /* + * Send in binary if requested and type has suitable send function. + */ + if (binary && OidIsValid(typclass->typsend)) + { + bytea *outputbytes; + int len; + + pq_sendbyte(out, LOGICALREP_COLUMN_BINARY); + outputbytes = OidSendFunctionCall(typclass->typsend, slot->tts_values[i]); + len = VARSIZE(outputbytes) - VARHDRSZ; + pq_sendint(out, len, 4); /* length */ + pq_sendbytes(out, VARDATA(outputbytes), len); /* data */ + pfree(outputbytes); + } + else + { + char *outputstr; + + pq_sendbyte(out, LOGICALREP_COLUMN_TEXT); + outputstr = OidOutputFunctionCall(typclass->typoutput, slot->tts_values[i]); + pq_sendcountedtext(out, outputstr, strlen(outputstr), false); + pfree(outputstr); + } + + ReleaseSysCache(typtup); + } +} + + +/* * Write a tuple to the outputstream, in the most efficient format possible. */ static void diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index ce5e1c5..5ecfd63 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -132,7 +132,10 @@ typedef struct RelationSyncEntry */ bool rowfilter_valid; List *exprstate_list; /* ExprState for row filter(s) */ - TupleTableSlot *scantuple; /* tuple table slot for row filter */ + TupleTableSlot *scantuple; /* tuple table slot for row filter */ + TupleTableSlot *new_tuple; /* slot for storing deformed new tuple during updates */ + TupleTableSlot *old_tuple; /* slot for storing deformed old tuple during updates */ + TupleTableSlot *tmp_new_tuple; /* slot for temporary new tuple used for expression evaluation */ /* * OID of the relation to publish changes as. For a partition, this may @@ -167,10 +170,16 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, /* row filter routines */ static EState *create_estate_for_relation(Relation rel); +static void pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry); static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, +static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry); +static bool pgoutput_row_filter_virtual(Relation relation, TupleTableSlot *slot, + RelationSyncEntry *entry); +static bool pgoutput_row_filter_update(Relation relation, HeapTuple oldtuple, + HeapTuple newtuple, RelationSyncEntry *entry, + ReorderBufferChangeType *action); /* * Specify output plugin callbacks @@ -734,18 +743,103 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) } /* - * Change is checked against the row filter, if any. - * - * If it returns true, the change is replicated, otherwise, it is not. + * Update is checked against the row filter, if any. + * Updates are transformed to inserts and deletes based on the + * old_tuple and new_tuple. The new action is updated in the + * action parameter. If not updated, action remains as update. + * old-row (match) new-row (no match) -> DELETE + * old-row (no match) new row (match) -> INSERT + * old-row (match) new row (match) -> UPDATE + * old-row (no match) new-row (no match) -> (drop change) + * If it returns true, the change is to be replicated, otherwise, it is not. */ static bool -pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +pgoutput_row_filter_update(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry, ReorderBufferChangeType *action) { - EState *estate; - ExprContext *ecxt; + TupleDesc desc = entry->scantuple->tts_tupleDescriptor; + int i; + bool old_matched, new_matched; + TupleTableSlot *tmp_new_slot, *old_slot, *new_slot; + + /* Bail out if there is no row filter */ + if (entry->exprstate_list == NIL) + return true; + + /* update require a new tuple */ + Assert(newtuple); + + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + get_rel_name(relation->rd_id)); + + /* + * If no old_tuple, then none of the replica identity colums changed + * and this would reduce to a simple update. + */ + if (!oldtuple) + return pgoutput_row_filter(relation, NULL, newtuple, entry); + + + old_slot = entry->old_tuple; + new_slot = entry->new_tuple; + tmp_new_slot = entry->tmp_new_tuple; + ExecClearTuple(old_slot); + ExecClearTuple(new_slot); + ExecClearTuple(tmp_new_slot); + + /* + * Unchanged toasted replica identity columns are + * only detoasted in the old tuple, copy this over to the newtuple. + */ + heap_deform_tuple(newtuple, desc, new_slot->tts_values, new_slot->tts_isnull); + heap_deform_tuple(oldtuple, desc, old_slot->tts_values, old_slot->tts_isnull); + + ExecStoreVirtualTuple(old_slot); + ExecStoreVirtualTuple(new_slot); + + tmp_new_slot = ExecCopySlot(tmp_new_slot, new_slot); + + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (tmp_new_slot->tts_isnull[i]) + continue; + + if ((att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(tmp_new_slot->tts_values[i])) && + (!old_slot->tts_isnull[i] && + !(VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i])))) + { + tmp_new_slot->tts_values[i] = old_slot->tts_values[i]; + } + + } + + old_matched = pgoutput_row_filter_virtual(relation, old_slot, entry); + new_matched = pgoutput_row_filter_virtual(relation, tmp_new_slot, entry); + + if (!old_matched && !new_matched) + return false; + + if (old_matched && new_matched) + *action = REORDER_BUFFER_CHANGE_UPDATE; + else if (old_matched && !new_matched) + *action = REORDER_BUFFER_CHANGE_DELETE; + else if (new_matched && !old_matched) + *action = REORDER_BUFFER_CHANGE_INSERT; + + return true; +} + +/* + * Initialize the row filter, the first time. + */ + +static void +pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry) +{ + Oid relid = RelationGetRelid(relation); ListCell *lc; - bool result = true; - Oid relid = RelationGetRelid(relation); /* * If the row filter caching is currently flagged "invalid" then it means we @@ -760,7 +854,7 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H TupleDesc tupdesc = RelationGetDescr(relation); /* - * Create a tuple table slot for row filter. TupleDesc must live as + * Create tuple table slots for row filter. TupleDesc must live as * long as the cache remains. Release the tuple table slot if it * already exists. */ @@ -769,9 +863,31 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H ExecDropSingleTupleTableSlot(entry->scantuple); entry->scantuple = NULL; } + if (entry->old_tuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->old_tuple); + entry->old_tuple = NULL; + } + + if (entry->new_tuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->new_tuple); + entry->new_tuple = NULL; + } + + if (entry->tmp_new_tuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->tmp_new_tuple); + entry->tmp_new_tuple = NULL; + } + oldctx = MemoryContextSwitchTo(CacheMemoryContext); tupdesc = CreateTupleDescCopy(tupdesc); entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + entry->old_tuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + entry->new_tuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + entry->tmp_new_tuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + MemoryContextSwitchTo(oldctx); /* @@ -846,6 +962,76 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H entry->rowfilter_valid = true; } +} + +/* + * Change is checked against the row filter, if any. + * + * If it returns true, the change is replicated, otherwise, it is not. + */ +static bool +pgoutput_row_filter_virtual(Relation relation, TupleTableSlot *slot, RelationSyncEntry *entry) +{ + EState *estate; + ExprContext *ecxt; + bool result = true; + Oid relid = RelationGetRelid(relation); + ListCell *lc; + + /* Bail out if there is no row filter */ + if (entry->exprstate_list == NIL) + return true; + + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)); + + PushActiveSnapshot(GetTransactionSnapshot()); + + estate = create_estate_for_relation(relation); + + /* Prepare context per tuple */ + ecxt = GetPerTupleExprContext(estate); + ecxt->ecxt_scantuple = slot; + + /* + * If the subscription has multiple publications and the same table has a + * different row filter in these publications, all row filters must be + * matched in order to replicate this change. + */ + foreach(lc, entry->exprstate_list) + { + ExprState *exprstate = (ExprState *) lfirst(lc); + + /* Evaluates row filter */ + result = pgoutput_row_filter_exec_expr(exprstate, ecxt); + + /* If the tuple does not match one of the row filters, bail out */ + if (!result) + break; + } + + /* Cleanup allocated resources */ + ResetExprContext(ecxt); + FreeExecutorState(estate); + PopActiveSnapshot(); + + return result; +} + +/* + * Change is checked against the row filter, if any. + * + * If it returns true, the change is replicated, otherwise, it is not. + */ +static bool +pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +{ + EState *estate; + ExprContext *ecxt; + bool result = true; + Oid relid = RelationGetRelid(relation); + ListCell *lc; /* Bail out if there is no row filter */ if (entry->exprstate_list == NIL) @@ -941,6 +1127,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); + /* Initialize the row_filter */ + pgoutput_row_filter_init(data, relation, relentry); + /* Send the data */ switch (change->action) { @@ -949,7 +1138,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple tuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, NULL, tuple, relentry)) + if (!pgoutput_row_filter(relation, NULL, tuple, relentry)) break; /* @@ -980,9 +1169,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; HeapTuple newtuple = &change->data.tp.newtuple->tuple; + ReorderBufferChangeType modified_action = REORDER_BUFFER_CHANGE_UPDATE; - /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, oldtuple, newtuple, relentry)) + if (!pgoutput_row_filter_update(relation, oldtuple, newtuple, relentry, + &modified_action)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1005,8 +1195,29 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + + switch (modified_action) + { + case REORDER_BUFFER_CHANGE_INSERT: + logicalrep_write_insert(ctx->out, xid, relation, newtuple, + data->binary); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + if (relentry->new_tuple != NULL && !TTS_EMPTY(relentry->new_tuple)) + logicalrep_write_update_cached(ctx->out, xid, relation, + relentry->old_tuple, relentry->new_tuple, data->binary); + else + logicalrep_write_update(ctx->out, xid, relation, oldtuple, + newtuple, data->binary); + break; + case REORDER_BUFFER_CHANGE_DELETE: + logicalrep_write_delete(ctx->out, xid, relation, oldtuple, + data->binary); + break; + default: + Assert(false); + } + OutputPluginWrite(ctx, true); break; } @@ -1016,7 +1227,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, oldtuple, NULL, relentry)) + if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1431,6 +1642,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->scantuple = NULL; + entry->new_tuple = NULL; + entry->old_tuple = NULL; + entry->tmp_new_tuple = NULL; entry->exprstate_list = NIL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if @@ -1635,10 +1849,20 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) * Row filter cache cleanups. (Will be rebuilt later if needed). */ entry->rowfilter_valid = false; - if (entry->scantuple != NULL) + if (entry->new_tuple != NULL) { - ExecDropSingleTupleTableSlot(entry->scantuple); - entry->scantuple = NULL; + ExecDropSingleTupleTableSlot(entry->new_tuple); + entry->new_tuple = NULL; + } + if (entry->old_tuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->old_tuple); + entry->old_tuple = NULL; + } + if (entry->tmp_new_tuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->tmp_new_tuple); + entry->tmp_new_tuple = NULL; } if (entry->exprstate_list != NIL) { diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 83741dc..ba71f3f 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -16,6 +16,7 @@ #include "access/xact.h" #include "replication/reorderbuffer.h" #include "utils/rel.h" +#include "executor/executor.h" /* * Protocol capabilities @@ -212,6 +213,9 @@ extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary); +extern void logicalrep_write_update_cached(StringInfo out, TransactionId xid, Relation rel, + TupleTableSlot *oldtuple, TupleTableSlot *newtuple, + bool binary); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5b40ff7..aec0059 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -51,7 +51,7 @@ typedef struct ReorderBufferTupleBuf * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of * logical decoding don't have to care about these. */ -enum ReorderBufferChangeType +typedef enum ReorderBufferChangeType { REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, @@ -65,7 +65,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE -}; +} ReorderBufferChangeType; /* forward declaration */ struct ReorderBufferTXN; @@ -82,7 +82,7 @@ typedef struct ReorderBufferChange XLogRecPtr lsn; /* The type of change. */ - enum ReorderBufferChangeType action; + ReorderBufferChangeType action; /* Transaction this change belongs to. */ struct ReorderBufferTXN *txn; diff --git a/src/test/subscription/t/025_row_filter.pl b/src/test/subscription/t/025_row_filter.pl index dc9becc..742bbbe 100644 --- a/src/test/subscription/t/025_row_filter.pl +++ b/src/test/subscription/t/025_row_filter.pl @@ -220,7 +220,8 @@ $node_publisher->wait_for_catchup($appname); # # - 1001, 1002, 1980 already exist from initial data copy # - INSERT (800, 'test 800') NO, because 800 is not > 1000 -# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered' +# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered', +# but row deleted after the update below. # - INSERT (1601, 'test 1601') YES, because 1601 > 1000 and 'test 1601' <> 'filtered' # - INSERT (1700, 'test 1700') YES, because 1700 > 1000 and 'test 1700' <> 'filtered' # - UPDATE (1600, NULL) NO, row filter evaluates to false because NULL is not <> 'filtered' @@ -232,7 +233,6 @@ $result = "SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2"); is($result, qq(1001|test 1001 1002|test 1002 -1600|test 1600 1601|test 1601 updated 1980|not filtered), 'check replicated rows to table tab_rowfilter_1'); -- 1.8.3.1