From 36468d6120dbb66fb905bb4cf06f4f08eabc0c7d Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Tue, 4 Jan 2022 15:08:22 +1100 Subject: [PATCH v58] Row filter updates based on old/new tuples When applying row filter on updates, we need to check both old_tuple and new_tuple to decide how an update needs to be transformed. If both evaluations are true, it sends the UPDATE. If both evaluations are false, it doesn't send the UPDATE. If only one of the tuples matches the row filter expression, there is a data consistency issue. Fixing this issue requires a transformation. UPDATE Transformations: Case 1: old-row (no match) new-row (no match) -> (drop change) Case 2: old-row (no match) new row (match) -> INSERT Case 3: old-row (match) new-row (no match) -> DELETE Case 4: old-row (match) new row (match) -> UPDATE Also tuples that have been deformed will be cached in slots to avoid multiple deforming of tuples. Examples, Let's say the old tuple satisfies the row filter but the new tuple doesn't. Since the old tuple satisfies, the initial table synchronization copied this row (or another method was used to guarantee that there is data consistency). However, after the UPDATE the new tuple doesn't satisfy the row filter then, from the data consistency perspective, that row should be removed on the subscriber. The UPDATE should be transformed into a DELETE statement and be sent to the subscriber. Keep this row on the subscriber is undesirable because it doesn't reflect what was defined in the row filter expression on the publisher. This row on the subscriber would likely not be modified by replication again. If someone inserted a new row with the same old identifier, replication could stop due to a constraint violation. Let's say the old tuple doesn't match the row filter but the new tuple does. Since the old tuple doesn't satisfy, the initial table synchronization probably didn't copy this row. However, after the UPDATE the new tuple does satisfies the row filter then, from the data consistency perspective, that row should inserted on the subscriber. The UPDATE should be transformed into a INSERT statement and be sent to the subscriber. Subsequent UPDATE or DELETE statements have no effect. However, this might surprise someone who expects the data set to satisfy the row filter expression on the provider. Author: Ajin Cherian --- src/backend/replication/logical/proto.c | 37 +- src/backend/replication/pgoutput/pgoutput.c | 666 +++++++++++++++++++--------- src/include/replication/logicalproto.h | 7 +- src/include/replication/reorderbuffer.h | 6 +- src/test/subscription/t/027_row_filter.pl | 55 ++- src/tools/pgindent/typedefs.list | 1 + 6 files changed, 548 insertions(+), 224 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b..1f72e17 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -15,6 +15,7 @@ #include "access/sysattr.h" #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" +#include "executor/executor.h" #include "libpq/pqformat.h" #include "replication/logicalproto.h" #include "utils/lsyscache.h" @@ -31,8 +32,8 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); - + HeapTuple tuple, TupleTableSlot *slot, + bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -410,7 +411,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, NULL, binary); } /* @@ -442,7 +443,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + HeapTuple oldtuple, HeapTuple newtuple, TupleTableSlot *oldslot, + TupleTableSlot *newslot, bool binary) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -463,11 +465,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, oldslot, binary); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, newslot, binary); } /* @@ -536,7 +538,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, NULL, binary); } /* @@ -749,13 +751,16 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, TupleTableSlot *slot, + bool binary) { TupleDesc desc; - Datum values[MaxTupleAttributeNumber]; - bool isnull[MaxTupleAttributeNumber]; + Datum *values; + bool *isnull; int i; uint16 nliveatts = 0; + Datum attr_values[MaxTupleAttributeNumber]; + bool attr_isnull[MaxTupleAttributeNumber]; desc = RelationGetDescr(rel); @@ -771,7 +776,17 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar enlargeStringInfo(out, tuple->t_len + nliveatts * (1 + 4)); - heap_deform_tuple(tuple, desc, values, isnull); + if (TupIsNull(slot)) + { + values = attr_values; + isnull = attr_isnull; + heap_deform_tuple(tuple, desc, values, isnull); + } + else + { + values = slot->tts_values; + isnull = slot->tts_isnull; + } /* Write the values */ for (i = 0; i < desc->natts; i++) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7a76a1b..296ed4c 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/tupconvert.h" +#include "access/xact.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" @@ -25,6 +26,7 @@ #include "parser/parse_coerce.h" #include "replication/logical.h" #include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "replication/origin.h" #include "replication/pgoutput.h" #include "utils/builtins.h" @@ -139,7 +141,11 @@ typedef struct RelationSyncEntry #define NUM_ROWFILTER_PUBACTIONS 3 /* ExprState array for row filter. One per publication action. */ ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS]; - TupleTableSlot *scantuple; /* tuple table slot for row filter */ + TupleTableSlot *scan_slot; /* tuple table slot for row filter */ + TupleTableSlot *new_slot; /* slot for storing deformed new tuple during + * updates */ + TupleTableSlot *old_slot; /* slot for storing deformed old tuple during + * updates */ /* * OID of the relation to publish changes as. For a partition, this may @@ -174,11 +180,15 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, /* row filter routines */ static EState *create_estate_for_relation(Relation rel); +static void pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry); static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(enum ReorderBufferChangeType changetype, PGOutputData *data, - Relation relation, HeapTuple oldtuple, - HeapTuple newtuple, RelationSyncEntry *entry); +static bool pgoutput_row_filter(enum ReorderBufferChangeType changetype, EState *relation, Oid relid, + HeapTuple oldtuple, HeapTuple newtuple, + TupleTableSlot *slot, RelationSyncEntry *entry); +static bool pgoutput_row_filter_update_check(enum ReorderBufferChangeType changetype, Relation relation, + HeapTuple oldtuple, HeapTuple newtuple, + RelationSyncEntry *entry, ReorderBufferChangeType *action); /* * Specify output plugin callbacks @@ -742,27 +752,209 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) } /* - * Change is checked against the row filter, if any. + * Evaluates the row filter for old and new tuple. If both evaluations are + * true, it sends the UPDATE. If both evaluations are false, it doesn't send + * the UPDATE. If only one of the tuples matches the row filter expression, + * there is a data consistency issue. Fixing this issue requires a + * transformation. * - * If it returns true, the change is replicated, otherwise, it is not. + * Transformations: + * Updates are transformed to inserts and deletes based on the + * old_tuple and new_tuple. The new action is updated in the + * action parameter. If not updated, action remains as update. + * + * Case 1: old-row (no match) new-row (no match) -> (drop change) + * Case 2: old-row (no match) new row (match) -> INSERT + * Case 3: old-row (match) new-row (no match) -> DELETE + * Case 4: old-row (match) new row (match) -> UPDATE + * + * If the change is to be replicated this function returns true, else false. + * + * Examples: + * Let's say the old tuple satisfies the row filter but the new tuple doesn't. + * Since the old tuple satisfies, the initial table synchronization copied this + * row (or another method was used to guarantee that there is data + * consistency). However, after the UPDATE the new tuple doesn't satisfy the + * row filter then, from the data consistency perspective, that row should be + * removed on the subscriber. The UPDATE should be transformed into a DELETE + * statement and be sent to the subscriber. Keep this row on the subscriber is + * undesirable because it doesn't reflect what was defined in the row filter + * expression on the publisher. This row on the subscriber would likely not be + * modified by replication again. If someone inserted a new row with the same + * old identifier, replication could stop due to a constraint violation. + * + * Let's say the old tuple doesn't match the row filter but the new tuple does. + * Since the old tuple doesn't satisfy, the initial table synchronization + * probably didn't copy this row. However, after the UPDATE the new tuple does + * satisfies the row filter then, from the data consistency perspective, that + * row should inserted on the subscriber. The UPDATE should be transformed into + * a INSERT statement and be sent to the subscriber. Subsequent UPDATE or + * DELETE statements have no effect (it matches no row -- see + * apply_handle_update_internal()). However, this might surprise someone who + * expects the data set to satisfy the row filter expression on the provider. */ static bool -pgoutput_row_filter(enum ReorderBufferChangeType changetype, PGOutputData *data, - Relation relation, HeapTuple oldtuple, HeapTuple newtuple, - RelationSyncEntry *entry) +pgoutput_row_filter_update_check(enum ReorderBufferChangeType changetype, Relation relation, + HeapTuple oldtuple, HeapTuple newtuple, + RelationSyncEntry *entry, ReorderBufferChangeType *action) { - EState *estate; - ExprContext *ecxt; - ListCell *lc; - bool result = true; - Oid relid = RelationGetRelid(relation); - List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */ - bool no_filter[] = {false, false, false}; /* One per pubaction */ + TupleDesc desc = RelationGetDescr(relation); + int i; + bool old_matched, + new_matched; + TupleTableSlot *tmp_new_slot, + *old_slot, + *new_slot; + EState *estate = NULL; Assert(changetype == REORDER_BUFFER_CHANGE_INSERT || changetype == REORDER_BUFFER_CHANGE_UPDATE || changetype == REORDER_BUFFER_CHANGE_DELETE); + /* Bail out if there is no row filter */ + if (!entry->exprstate[changetype]) + return true; + + /* update requires a new tuple */ + Assert(newtuple); + + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + get_rel_name(relation->rd_id)); + + /* Clear the tuples */ + ExecClearTuple(entry->old_slot); + ExecClearTuple(entry->new_slot); + + estate = create_estate_for_relation(relation); + + /* + * If no old_tuple, then none of the replica identity columns changed and + * this would reduce to a simple update. + */ + if (!oldtuple) + { + bool res; + + *action = REORDER_BUFFER_CHANGE_UPDATE; + res = pgoutput_row_filter(changetype, estate, + RelationGetRelid(relation), NULL, newtuple, + NULL, entry); + + FreeExecutorState(estate); + return res; + } + + old_slot = entry->old_slot; + new_slot = entry->new_slot; + tmp_new_slot = new_slot; + + heap_deform_tuple(newtuple, desc, new_slot->tts_values, new_slot->tts_isnull); + heap_deform_tuple(oldtuple, desc, old_slot->tts_values, old_slot->tts_isnull); + + ExecStoreVirtualTuple(old_slot); + ExecStoreVirtualTuple(new_slot); + + /* + * For updates, both the newtuple and oldtuple needs to be checked against + * the row-filter. The newtuple might not have all the replica identity + * columns, in which case it needs to be copied over from the oldtuple. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + /* if the column in the new_tuple is null, nothing to do */ + if (tmp_new_slot->tts_isnull[i]) + continue; + + /* + * Unchanged toasted replica identity columns are only detoasted in + * the old tuple, copy this over to the newtuple. + */ + if ((att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(tmp_new_slot->tts_values[i])) && + (!old_slot->tts_isnull[i] && + !(VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i])))) + { + if (tmp_new_slot == new_slot) + { + tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);; + ExecClearTuple(tmp_new_slot); + ExecCopySlot(tmp_new_slot, new_slot); + } + + tmp_new_slot->tts_values[i] = old_slot->tts_values[i]; + } + + } + + old_matched = pgoutput_row_filter(changetype, estate, + RelationGetRelid(relation), NULL, NULL, + old_slot, entry); + new_matched = pgoutput_row_filter(changetype, estate, + RelationGetRelid(relation), NULL, NULL, + tmp_new_slot, entry); + + FreeExecutorState(estate); + + /* + * Case 1: if both tuples don't match the row filter, bail out. Send + * nothing. + */ + if (!old_matched && !new_matched) + return false; + + /* + * Case 2: if the old tuple doesn't satisfy the row filter but the new + * tuple does, transform the UPDATE into INSERT. + * + * FIXME: (the below comment is from Euler's v50-0005 but it does not + * exactly match this current code, which AFAIK is just using the new + * tuple but not one that is transformed). This transformation requires + * another tuple. This transformed tuple will be used for INSERT. The new + * tuple is the base for the transformed tuple. However, the new tuple + * might not have column values from the replica identity. In this case, + * copy these values from the old tuple. + */ + if (!old_matched && new_matched) + *action = REORDER_BUFFER_CHANGE_INSERT; + + /* + * Case 3: if the old tuple satisfies the row filter but the new tuple + * doesn't, transform the UPDATE into DELETE. + * + * This transformation does not require another tuple. Old tuple will be + * used for DELETE. + */ + else if (old_matched && !new_matched) + *action = REORDER_BUFFER_CHANGE_DELETE; + + /* + * Case 4: if both tuples matches the row filter, transformation isn't + * required. Send the UPDATE. + */ + else if (new_matched && old_matched) + *action = REORDER_BUFFER_CHANGE_UPDATE; + + return true; +} + +/* + * Initialize the row filter, the first time. + */ +static void +pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry) +{ + ListCell *lc; + List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */ + bool no_filter[] = {false, false, false}; /* One per pubaction */ + MemoryContext oldctx; + int idx; + bool found_filters = false; + int idx_ins = REORDER_BUFFER_CHANGE_INSERT; + int idx_upd = REORDER_BUFFER_CHANGE_UPDATE; + int idx_del = REORDER_BUFFER_CHANGE_DELETE; + /* * If the row filter caching is currently flagged "invalid" then it means * we don't know yet if there is/isn't any row filters for this relation. @@ -783,207 +975,221 @@ pgoutput_row_filter(enum ReorderBufferChangeType changetype, PGOutputData *data, * necessary at all. So the decision was to defer this logic to last * moment when we know it will be needed. */ - if (!entry->exprstate_valid) + if (entry->exprstate_valid) + return; + + /* + * Find if there are any row filters for this relation. If there are, then + * prepare the necessary ExprState and cache it in entry->exprstate. + * + * NOTE: All publication-table mappings must be checked. + * + * NOTE: If the relation is a partition and pubviaroot is true, use the + * row filter of the topmost partitioned table instead of the row filter + * of its own partition. + * + * NOTE: Multiple publications might have multiple row filters for this + * relation. Since row filter usage depends on the DML operation, there + * are multiple lists (one for each operation) which row filters will be + * appended. + * + * NOTE: FOR ALL TABLES implies "don't use row filter expression" so it + * takes precedence. + * + * NOTE: ALL TABLES IN SCHEMA implies "don't use row filter expression" if + * the schema is the same as the table schema. + */ + foreach(lc, data->publications) { - MemoryContext oldctx; - int idx; - bool found_filters = false; - int idx_ins = REORDER_BUFFER_CHANGE_INSERT; - int idx_upd = REORDER_BUFFER_CHANGE_UPDATE; - int idx_del = REORDER_BUFFER_CHANGE_DELETE; + Publication *pub = lfirst(lc); + HeapTuple rftuple; + Datum rfdatum; + bool rfisnull; + List *schemarelids = NIL; +#define SET_NO_FILTER_FOR_CURRENT_PUBACTIONS \ + if (pub->pubactions.pubinsert) \ + no_filter[idx_ins] = true; \ + if (pub->pubactions.pubupdate) \ + no_filter[idx_upd] = true; \ + if (pub->pubactions.pubdelete) \ + no_filter[idx_del] = true /* - * Find if there are any row filters for this relation. If there are, - * then prepare the necessary ExprState and cache it in - * entry->exprstate. - * - * NOTE: All publication-table mappings must be checked. - * - * NOTE: If the relation is a partition and pubviaroot is true, use - * the row filter of the topmost partitioned table instead of the row - * filter of its own partition. - * - * NOTE: Multiple publications might have multiple row filters for - * this relation. Since row filter usage depends on the DML operation, - * there are multiple lists (one for each operation) which row filters - * will be appended. - * - * NOTE: FOR ALL TABLES implies "don't use row filter expression" so - * it takes precedence. - * - * NOTE: ALL TABLES IN SCHEMA implies "don't use row filter - * expression" if the schema is the same as the table schema. + * If the publication is FOR ALL TABLES then it is treated the same as + * if this table has no row filters (even if for other publications it + * does). */ - foreach(lc, data->publications) + if (pub->alltables) { - Publication *pub = lfirst(lc); - HeapTuple rftuple; - Datum rfdatum; - bool rfisnull; - List *schemarelids = NIL; -#define SET_NO_FILTER_FOR_CURRENT_PUBACTIONS \ - if (pub->pubactions.pubinsert) \ - no_filter[idx_ins] = true; \ - if (pub->pubactions.pubupdate) \ - no_filter[idx_upd] = true; \ - if (pub->pubactions.pubdelete) \ - no_filter[idx_del] = true + SET_NO_FILTER_FOR_CURRENT_PUBACTIONS; - /* - * If the publication is FOR ALL TABLES then it is treated the - * same as if this table has no row filters (even if for other - * publications it does). - */ - if (pub->alltables) - { - SET_NO_FILTER_FOR_CURRENT_PUBACTIONS; + /* Quick exit loop if all pubactions have no row filter. */ + if (no_filter[idx_ins] && no_filter[idx_upd] && no_filter[idx_del]) + break; - /* Quick exit loop if all pubactions have no row filter. */ - if (no_filter[idx_ins] && no_filter[idx_upd] && no_filter[idx_del]) - break; + /* No additional work for this publication. Next one. */ + continue; + } - /* No additional work for this publication. Next one. */ - continue; - } + /* + * If the publication is FOR ALL TABLES IN SCHEMA and it overlaps with + * the current relation in the same schema then this is also treated + * same as if this table has no row filters (even if for other + * publications it does). + */ + schemarelids = GetAllSchemaPublicationRelations(pub->oid, + pub->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + if (list_member_oid(schemarelids, entry->relid)) + { + SET_NO_FILTER_FOR_CURRENT_PUBACTIONS; - /* - * If the publication is FOR ALL TABLES IN SCHEMA and it overlaps - * with the current relation in the same schema then this is also - * treated same as if this table has no row filters (even if for - * other publications it does). - */ - schemarelids = GetAllSchemaPublicationRelations(pub->oid, - pub->pubviaroot ? - PUBLICATION_PART_ROOT : - PUBLICATION_PART_LEAF); - if (list_member_oid(schemarelids, entry->relid)) - { - SET_NO_FILTER_FOR_CURRENT_PUBACTIONS; + list_free(schemarelids); - list_free(schemarelids); + /* Quick exit loop if all pubactions have no row filter. */ + if (no_filter[idx_ins] && no_filter[idx_upd] && no_filter[idx_del]) + break; - /* Quick exit loop if all pubactions have no row filter. */ - if (no_filter[idx_ins] && no_filter[idx_upd] && no_filter[idx_del]) - break; + /* No additional work for this publication. Next one. */ + continue; + } + list_free(schemarelids); - /* No additional work for this publication. Next one. */ - continue; - } - list_free(schemarelids); + /* + * Lookup if there is a row filter, and if yes remember it in a list + * (per pubaction). If no, then remember there was no filter for this + * pubaction. Code following this 'publications' loop will combine all + * filters. + */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(entry->publish_as_relid), ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(rftuple)) + { + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); - /* - * Lookup if there is a row filter, and if yes remember it in a - * list (per pubaction). If no, then remember there was no filter - * for this pubaction. Code following this 'publications' loop - * will combine all filters. - */ - rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(entry->publish_as_relid), ObjectIdGetDatum(pub->oid)); - if (HeapTupleIsValid(rftuple)) + if (!rfisnull) { - rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); + Node *rfnode; - if (!rfisnull) + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + /* Gather the rfnodes per pubaction of this publiaction. */ + if (pub->pubactions.pubinsert) { - Node *rfnode; - - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - /* Gather the rfnodes per pubaction of this publiaction. */ - if (pub->pubactions.pubinsert) - { - rfnode = stringToNode(TextDatumGetCString(rfdatum)); - rfnodes[idx_ins] = lappend(rfnodes[idx_ins], rfnode); - } - if (pub->pubactions.pubupdate) - { - rfnode = stringToNode(TextDatumGetCString(rfdatum)); - rfnodes[idx_upd] = lappend(rfnodes[idx_upd], rfnode); - } - if (pub->pubactions.pubdelete) - { - rfnode = stringToNode(TextDatumGetCString(rfdatum)); - rfnodes[idx_del] = lappend(rfnodes[idx_del], rfnode); - } - MemoryContextSwitchTo(oldctx); + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + rfnodes[idx_ins] = lappend(rfnodes[idx_ins], rfnode); } - else + if (pub->pubactions.pubupdate) { - /* Remember which pubactions have no row filter. */ - SET_NO_FILTER_FOR_CURRENT_PUBACTIONS; - - /* Quick exit loop if all pubactions have no row filter. */ - if (no_filter[idx_ins] && no_filter[idx_upd] && no_filter[idx_del]) - { - ReleaseSysCache(rftuple); - break; - } + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + rfnodes[idx_upd] = lappend(rfnodes[idx_upd], rfnode); } - - ReleaseSysCache(rftuple); + if (pub->pubactions.pubdelete) + { + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + rfnodes[idx_del] = lappend(rfnodes[idx_del], rfnode); + } + MemoryContextSwitchTo(oldctx); } - - } /* loop all subscribed publications */ - - /* - * Now all the filters for all pubactions are known. Combine them when - * their pubactions are same. - * - * All row filter expressions will be discarded if there is one - * publication-relation entry without a row filter. That's because all - * expressions are aggregated by the OR operator. The row filter - * absence means replicate all rows so a single valid expression means - * publish this row. - */ - for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++) - { - int n_filters; - - if (no_filter[idx]) + else { - if (rfnodes[idx]) + /* Remember which pubactions have no row filter. */ + SET_NO_FILTER_FOR_CURRENT_PUBACTIONS; + + /* Quick exit loop if all pubactions have no row filter. */ + if (no_filter[idx_ins] && no_filter[idx_upd] && no_filter[idx_del]) { - list_free_deep(rfnodes[idx]); - rfnodes[idx] = NIL; + ReleaseSysCache(rftuple); + break; } } - /* - * If there was one or more filter for this pubaction then combine - * them (if necessary) and cache the ExprState. - */ - n_filters = list_length(rfnodes[idx]); - if (n_filters > 0) - { - Node *rfnode; + ReleaseSysCache(rftuple); + } - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - rfnode = n_filters > 1 ? makeBoolExpr(OR_EXPR, rfnodes[idx], -1) : linitial(rfnodes[idx]); - entry->exprstate[idx] = pgoutput_row_filter_init_expr(rfnode); - MemoryContextSwitchTo(oldctx); + } /* loop all subscribed publications */ + + /* + * Now all the filters for all pubactions are known. Combine them when + * their pubactions are same. + * + * All row filter expressions will be discarded if there is one + * publication-relation entry without a row filter. That's because all + * expressions are aggregated by the OR operator. The row filter absence + * means replicate all rows so a single valid expression means publish + * this row. + */ + for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++) + { + int n_filters; - found_filters = true; /* flag that we will need slots made */ + if (no_filter[idx]) + { + if (rfnodes[idx]) + { + list_free_deep(rfnodes[idx]); + rfnodes[idx] = NIL; } - } /* for each pubaction */ + } - if (found_filters) + /* + * If there was one or more filter for this pubaction then combine + * them (if necessary) and cache the ExprState. + */ + n_filters = list_length(rfnodes[idx]); + if (n_filters > 0) { - TupleDesc tupdesc = RelationGetDescr(relation); + Node *rfnode; - /* - * Create tuple table slots for row filter. Create a copy of the - * TupleDesc as it needs to live as long as the cache remains. - */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); - tupdesc = CreateTupleDescCopy(tupdesc); - entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + rfnode = n_filters > 1 ? makeBoolExpr(OR_EXPR, rfnodes[idx], -1) : linitial(rfnodes[idx]); + entry->exprstate[idx] = pgoutput_row_filter_init_expr(rfnode); MemoryContextSwitchTo(oldctx); + + found_filters = true; /* flag that we will need slots made */ } + } /* for each pubaction */ + + if (found_filters) + { + TupleDesc tupdesc = RelationGetDescr(relation); - entry->exprstate_valid = true; + /* + * Create tuple table slots for row filter. Create a copy of the + * TupleDesc as it needs to live as long as the cache remains. + */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + tupdesc = CreateTupleDescCopy(tupdesc); + entry->scan_slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + entry->old_slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + entry->new_slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + MemoryContextSwitchTo(oldctx); } - /* Bail out if there is no row filter */ - if (!entry->exprstate[changetype]) - return true; + entry->exprstate_valid = true; +} + +/* + * Change is checked against the row filter, if any. + * + * If it returns true, the change is replicated, otherwise, it is not. + */ +static bool +pgoutput_row_filter(enum ReorderBufferChangeType changetype, EState *estate, Oid relid, + HeapTuple oldtuple, HeapTuple newtuple, TupleTableSlot *slot, + RelationSyncEntry *entry) +{ + ExprContext *ecxt; + bool result = true; + + Assert(changetype == REORDER_BUFFER_CHANGE_INSERT || + changetype == REORDER_BUFFER_CHANGE_UPDATE || + changetype == REORDER_BUFFER_CHANGE_DELETE); + + /* + * The check for existence of a filter (for this operation) is already + * made before calling this function. + */ + Assert(entry->exprstate[changetype] != NULL); if (message_level_is_interesting(DEBUG3)) elog(DEBUG3, "table \"%s.%s\" has row filter", @@ -992,13 +1198,21 @@ pgoutput_row_filter(enum ReorderBufferChangeType changetype, PGOutputData *data, PushActiveSnapshot(GetTransactionSnapshot()); - estate = create_estate_for_relation(relation); - /* Prepare context per tuple */ ecxt = GetPerTupleExprContext(estate); - ecxt->ecxt_scantuple = entry->scantuple; + ecxt->ecxt_scantuple = entry->scan_slot; - ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); + /* + * The default behavior for UPDATEs is to use the new tuple for row + * filtering. If the UPDATE requires a transformation, the new tuple will + * be replaced by the transformed tuple before calling this routine. + */ + if (newtuple || oldtuple) + ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); + else + { + ecxt->ecxt_scantuple = slot; + } /* * NOTE: Multiple publication row filters have already been combined to a @@ -1010,9 +1224,6 @@ pgoutput_row_filter(enum ReorderBufferChangeType changetype, PGOutputData *data, result = pgoutput_row_filter_exec_expr(entry->exprstate[changetype], ecxt); } - /* Cleanup allocated resources */ - ResetExprContext(ecxt); - FreeExecutorState(estate); PopActiveSnapshot(); return result; @@ -1032,6 +1243,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; + EState *estate = NULL; if (!is_publishable_relation(relation)) return; @@ -1069,6 +1281,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); + /* Initialize the row_filter */ + pgoutput_row_filter_init(data, relation, relentry); + /* Send the data */ switch (change->action) { @@ -1076,10 +1291,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { HeapTuple tuple = &change->data.tp.newtuple->tuple; - /* Check row filter. */ - if (!pgoutput_row_filter(change->action, data, relation, NULL, tuple, relentry)) - break; - /* * Schema should be sent before the logic that replaces the * relation because it also sends the ancestor's relation. @@ -1097,6 +1308,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, tuple = execute_attr_map_tuple(tuple, relentry->map); } + if (relentry->exprstate[change->action]) + { + estate = create_estate_for_relation(relation); + + /* Check row filter. */ + if (!pgoutput_row_filter(change->action, estate, + RelationGetRelid(relation), NULL, + tuple, NULL, relentry)) + break; + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, relation, tuple, data->binary); @@ -1108,10 +1330,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; HeapTuple newtuple = &change->data.tp.newtuple->tuple; - - /* Check row filter. */ - if (!pgoutput_row_filter(change->action, data, relation, oldtuple, newtuple, relentry)) - break; + ReorderBufferChangeType modified_action = REORDER_BUFFER_CHANGE_UPDATE; maybe_send_schema(ctx, change, relation, relentry); @@ -1132,9 +1351,34 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } } + /* Check row filter */ + if (!pgoutput_row_filter_update_check(change->action, relation, + oldtuple, newtuple, relentry, + &modified_action)) + break; + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + + switch (modified_action) + { + case REORDER_BUFFER_CHANGE_INSERT: + logicalrep_write_insert(ctx->out, xid, relation, newtuple, + data->binary); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + logicalrep_write_update(ctx->out, xid, relation, + oldtuple, newtuple, relentry->old_slot, + relentry->new_slot, + data->binary); + break; + case REORDER_BUFFER_CHANGE_DELETE: + logicalrep_write_delete(ctx->out, xid, relation, oldtuple, + data->binary); + break; + default: + Assert(false); + } + OutputPluginWrite(ctx, true); break; } @@ -1143,10 +1387,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; - /* Check row filter. */ - if (!pgoutput_row_filter(change->action, data, relation, oldtuple, NULL, relentry)) - break; - maybe_send_schema(ctx, change, relation, relentry); /* Switch relation if publishing via root. */ @@ -1160,6 +1400,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); } + if (relentry->exprstate[change->action]) + { + estate = create_estate_for_relation(relation); + + /* Check row filter. */ + if (!pgoutput_row_filter(change->action, estate, + RelationGetRelid(relation), oldtuple, + NULL, NULL, relentry)) + break; + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_delete(ctx->out, xid, relation, oldtuple, data->binary); @@ -1178,6 +1429,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ancestor = NULL; } + if (estate) + FreeExecutorState(estate); + /* Cleanup */ MemoryContextSwitchTo(old); MemoryContextReset(data->context); @@ -1561,7 +1815,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->exprstate_valid = false; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; - entry->scantuple = NULL; + entry->scan_slot = NULL; + entry->new_slot = NULL; + entry->old_slot = NULL; entry->exprstate[REORDER_BUFFER_CHANGE_INSERT] = NULL; entry->exprstate[REORDER_BUFFER_CHANGE_UPDATE] = NULL; entry->exprstate[REORDER_BUFFER_CHANGE_DELETE] = NULL; @@ -1772,10 +2028,10 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) * Row filter cache cleanups. (Will be rebuilt later if needed). */ entry->exprstate_valid = false; - if (entry->scantuple != NULL) + if (entry->scan_slot != NULL) { - ExecDropSingleTupleTableSlot(entry->scantuple); - entry->scantuple = NULL; + ExecDropSingleTupleTableSlot(entry->scan_slot); + entry->scan_slot = NULL; } /* Cleanup the ExprState for each of the pubactions. */ for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++) diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 83741dc..9df9260 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -16,6 +16,7 @@ #include "access/xact.h" #include "replication/reorderbuffer.h" #include "utils/rel.h" +#include "executor/executor.h" /* * Protocol capabilities @@ -211,7 +212,11 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid, extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + HeapTuple newtuple, TupleTableSlot *oldslot, + TupleTableSlot *newslot, bool binary); +extern void logicalrep_write_update_cached(StringInfo out, TransactionId xid, Relation rel, + TupleTableSlot *oldtuple, TupleTableSlot *newtuple, + bool binary); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5b40ff7..aec0059 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -51,7 +51,7 @@ typedef struct ReorderBufferTupleBuf * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of * logical decoding don't have to care about these. */ -enum ReorderBufferChangeType +typedef enum ReorderBufferChangeType { REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, @@ -65,7 +65,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE -}; +} ReorderBufferChangeType; /* forward declaration */ struct ReorderBufferTXN; @@ -82,7 +82,7 @@ typedef struct ReorderBufferChange XLogRecPtr lsn; /* The type of change. */ - enum ReorderBufferChangeType action; + ReorderBufferChangeType action; /* Transaction this change belongs to. */ struct ReorderBufferTXN *txn; diff --git a/src/test/subscription/t/027_row_filter.pl b/src/test/subscription/t/027_row_filter.pl index abeaf76..81a1374 100644 --- a/src/test/subscription/t/027_row_filter.pl +++ b/src/test/subscription/t/027_row_filter.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; -use Test::More tests => 14; +use Test::More tests => 15; # create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); @@ -150,6 +150,10 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)" ); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_toast (a text PRIMARY KEY, b text)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_rowfilter_toast ALTER COLUMN a SET STORAGE EXTERNAL"); # setup structure on subscriber $node_subscriber->safe_psql('postgres', @@ -174,6 +178,8 @@ $node_subscriber->safe_psql('postgres', $node_subscriber->safe_psql('postgres', "ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)" ); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_toast (a text PRIMARY KEY, b text)"); # setup logical replication $node_publisher->safe_psql('postgres', @@ -208,6 +214,8 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub_4b FOR TABLE tab_rowfilter_4" ); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_toast FOR TABLE tab_rowfilter_toast WHERE (a = repeat('1234567890', 200))"); # # The following INSERTs are executed before the CREATE SUBSCRIPTION, so these @@ -237,8 +245,11 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(16000, 103)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_toast(a, b) VALUES(repeat('1234567890', 200), '1234567890')"); + $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_toast" ); $node_publisher->wait_for_catchup($appname); @@ -325,6 +336,14 @@ $result = is($result, qq(15000|102 16000|103), 'check initial data copy from partition tab_rowfilter_greater_10k'); +# Check expected replicated rows for tab_rowfilter_toast +# tab_rowfilter_toast filter: (a = repeat('1234567890', 200)) +# INSERT (repeat('1234567890', 200) ,'1234567890') YES +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a = repeat('1234567890', 200), b FROM tab_rowfilter_toast"); +is($result, qq(t|1234567890), 'check initial data copy from table tab_rowfilter_toast'); + # The following commands are executed after CREATE SUBSCRIPTION, so these SQL # commands are for testing normal logical replication behavior. # @@ -336,12 +355,16 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1601, 'test 1601')"); $node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1602, 'filtered')"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1700, 'test 1700')"); $node_publisher->safe_psql('postgres', "UPDATE tab_rowfilter_1 SET b = NULL WHERE a = 1600"); $node_publisher->safe_psql('postgres', "UPDATE tab_rowfilter_1 SET b = 'test 1601 updated' WHERE a = 1601"); $node_publisher->safe_psql('postgres', + "UPDATE tab_rowfilter_1 SET b = 'test 1602 updated' WHERE a = 1602"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rowfilter_1 WHERE a = 1700"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_2 (c) VALUES (21), (22), (23), (24), (25)"); @@ -383,11 +406,14 @@ is($result, qq(13|0|12), 'check replicated rows to tab_rowfilter_4'); # # - 1001, 1002, 1980 already exist from initial data copy # - INSERT (800, 'test 800') NO, because 800 is not > 1000 -# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered' +# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered', +# but row deleted after the update below. # - INSERT (1601, 'test 1601') YES, because 1601 > 1000 and 'test 1601' <> 'filtered' +# - INSERT (1602, 'filtered') NO, because b == 'filtered' # - INSERT (1700, 'test 1700') YES, because 1700 > 1000 and 'test 1700' <> 'filtered' # - UPDATE (1600, NULL) NO, row filter evaluates to false because NULL is not <> 'filtered' # - UPDATE (1601, 'test 1601 updated') YES, because 1601 > 1000 and 'test 1601 updated' <> 'filtered' +# - UPDATE (1602, 'test 1602 updated') YES, because 1602 > 1000 and 'test 1602 updated' <> 'filtered' # - DELETE (1700) YES, because 1700 > 1000 and 'test 1700' <> 'filtered' # $result = @@ -395,8 +421,8 @@ $result = "SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2"); is($result, qq(1001|test 1001 1002|test 1002 -1600|test 1600 1601|test 1601 updated +1602|test 1602 updated 1980|not filtered), 'check replicated rows to table tab_rowfilter_1'); # Publish using root partitioned table @@ -458,5 +484,26 @@ is( $result, qq(1|100 4001|30 4500|450), 'check publish_via_partition_root behavior'); +# FIXME: Currently, If replica identity is set to key and the key is not +# modified we don't log key seperately, because it should be logged along with +# the updated tuple. But if the key is stored externally we must have to +# detoast and log it separately. The patch to fix the bug is still pending[1], +# the following tests for unchanged toasted key column would fail without +# applying the bug fix patch. So temporarily keep the the following tests +# commented before the bug fix patch is committed. +# [1] https://postgr.es/m/OS0PR01MB611342D0A92D4F4BF26C0F47FB229@OS0PR01MB6113.jpnprd01.prod.outlook.com + +# UPDATE the non-key column for table tab_rowfilter_toast +#$node_publisher->safe_psql('postgres', +# "UPDATE tab_rowfilter_toast SET b = '1'"); + +# Check expected replicated rows for tab_rowfilter_toast +# tab_rowfilter_toast filter: (a = repeat('1234567890', 200)) +# UPDATE (repeat('1234567890', 200) ,'1') YES +#$result = +# $node_subscriber->safe_psql('postgres', +# "SELECT a = repeat('1234567890', 200), b FROM tab_rowfilter_toast"); +#is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast'); + $node_subscriber->stop('fast'); $node_publisher->stop('fast'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0c523bf..4aa9f58 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2198,6 +2198,7 @@ ReorderBufferApplyChangeCB ReorderBufferApplyTruncateCB ReorderBufferBeginCB ReorderBufferChange +ReorderBufferChangeType ReorderBufferCommitCB ReorderBufferCommitPreparedCB ReorderBufferDiskChange -- 1.8.3.1