From 44f401ecb53dd43e3a7a05912e25d644748f936f Mon Sep 17 00:00:00 2001 From: Euler Taveira Date: Tue, 27 Feb 2018 04:03:13 +0000 Subject: [PATCH 5/8] Row filtering for logical replication When you define or modify a publication you optionally can filter rows to be published using a WHERE condition. This condition is any expression that evaluates to boolean. Only those rows that satisfy the WHERE condition will be sent to subscribers. --- doc/src/sgml/catalogs.sgml | 9 +++ doc/src/sgml/ref/alter_publication.sgml | 11 ++- doc/src/sgml/ref/create_publication.sgml | 26 +++++- src/backend/catalog/pg_publication.c | 102 ++++++++++++++++++++++-- src/backend/commands/publicationcmds.c | 93 +++++++++++++++------- src/backend/parser/gram.y | 26 ++++-- src/backend/parser/parse_agg.c | 10 +++ src/backend/parser/parse_expr.c | 14 +++- src/backend/parser/parse_func.c | 3 + src/backend/replication/logical/tablesync.c | 119 +++++++++++++++++++++++++--- src/backend/replication/logical/worker.c | 2 +- src/backend/replication/pgoutput/pgoutput.c | 100 ++++++++++++++++++++++- src/include/catalog/pg_publication.h | 9 ++- src/include/catalog/pg_publication_rel.h | 10 ++- src/include/catalog/toasting.h | 1 + src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 11 ++- src/include/parser/parse_node.h | 1 + src/include/replication/logicalrelation.h | 2 + src/test/regress/expected/publication.out | 29 +++++++ src/test/regress/sql/publication.sql | 21 +++++ src/test/subscription/t/013_row_filter.pl | 96 ++++++++++++++++++++++ 22 files changed, 629 insertions(+), 67 deletions(-) create mode 100644 src/test/subscription/t/013_row_filter.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 5e71a2e865..7f11225f65 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5595,6 +5595,15 @@ SCRAM-SHA-256$<iteration count>:&l pg_class.oid Reference to relation + + + prqual + pg_node_tree + + Expression tree (in the form of a + nodeToString() representation) for the relation's + qualifying condition + diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index 534e598d93..9608448207 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -21,8 +21,8 @@ PostgreSQL documentation -ALTER PUBLICATION name ADD TABLE [ ONLY ] table_name [ * ] [, ...] -ALTER PUBLICATION name SET TABLE [ ONLY ] table_name [ * ] [, ...] +ALTER PUBLICATION name ADD TABLE [ ONLY ] table_name [ * ] [ WHERE ( expression ) ] [, ...] +ALTER PUBLICATION name SET TABLE [ ONLY ] table_name [ * ] [ WHERE ( expression ) ] [, ...] ALTER PUBLICATION name DROP TABLE [ ONLY ] table_name [ * ] [, ...] ALTER PUBLICATION name SET ( publication_parameter [= value] [, ... ] ) ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER } @@ -91,7 +91,12 @@ ALTER PUBLICATION name RENAME TO ONLY is not specified, the table and all its descendant tables (if any) are affected. Optionally, * can be specified after the table - name to explicitly indicate that descendant tables are included. + name to explicitly indicate that descendant tables are included. If the + optional WHERE clause is specified, rows that do not + satisfy the expression will + not be published. Note that parentheses are required around the + expression. The expression + is executed with the role used for the replication connection. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 99f87ca393..6e99943374 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -22,7 +22,7 @@ PostgreSQL documentation CREATE PUBLICATION name - [ FOR TABLE [ ONLY ] table_name [ * ] [, ...] + [ FOR TABLE [ ONLY ] table_name [ * ] [ WHERE ( expression ) ] [, ...] | FOR ALL TABLES ] [ WITH ( publication_parameter [= value] [, ... ] ) ] @@ -68,7 +68,10 @@ CREATE PUBLICATION name that table is added to the publication. If ONLY is not specified, the table and all its descendant tables (if any) are added. Optionally, * can be specified after the table name to - explicitly indicate that descendant tables are included. + explicitly indicate that descendant tables are included. If the optional + WHERE clause is specified, rows that do not satisfy + the expression will not be + published. Note that parentheses are required around the expression. @@ -157,6 +160,13 @@ CREATE PUBLICATION name + Columns used in the WHERE clause must be part of the + primary key or be covered by REPLICA IDENTITY otherwise + UPDATE and DELETE operations will not + be replicated. + + + For an INSERT ... ON CONFLICT command, the publication will publish the operation that actually results from the command. So depending of the outcome, it may be published as either INSERT or @@ -171,6 +181,11 @@ CREATE PUBLICATION name DDL operations are not published. + + + The WHERE clause expression is executed with the role used + for the replication connection. + @@ -184,6 +199,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments; + Create a publication that publishes all changes from active departments: + +CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE); + + + + Create a publication that publishes all changes in all tables: CREATE PUBLICATION alltables FOR ALL TABLES; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index fd5da7d5f7..c873419a9e 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -34,6 +34,10 @@ #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "parser/parse_clause.h" +#include "parser/parse_collate.h" +#include "parser/parse_relation.h" + #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -149,18 +153,21 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) * Insert new publication / relation mapping. */ ObjectAddress -publication_add_relation(Oid pubid, Relation targetrel, +publication_add_relation(Oid pubid, PublicationRelationQual *targetrel, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; - Oid relid = RelationGetRelid(targetrel); + Oid relid = RelationGetRelid(targetrel->relation); Oid prrelid; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; + ParseState *pstate; + RangeTblEntry *rte; + Node *whereclause; rel = table_open(PublicationRelRelationId, RowExclusiveLock); @@ -180,10 +187,27 @@ publication_add_relation(Oid pubid, Relation targetrel, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("relation \"%s\" is already member of publication \"%s\"", - RelationGetRelationName(targetrel), pub->name))); + RelationGetRelationName(targetrel->relation), pub->name))); } - check_publication_add_relation(targetrel); + check_publication_add_relation(targetrel->relation); + + /* Set up a pstate to parse with */ + pstate = make_parsestate(NULL); + pstate->p_sourcetext = nodeToString(targetrel->whereClause); + + rte = addRangeTableEntryForRelation(pstate, targetrel->relation, + AccessShareLock, + NULL, false, false); + addRTEtoQuery(pstate, rte, false, true, true); + + whereclause = transformWhereClause(pstate, + copyObject(targetrel->whereClause), + EXPR_KIND_PUBLICATION_WHERE, + "PUBLICATION"); + + /* Fix up collation information */ + assign_expr_collations(pstate, whereclause); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -197,6 +221,12 @@ publication_add_relation(Oid pubid, Relation targetrel, values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + /* Add qualifications, if available */ + if (whereclause) + values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(whereclause)); + else + nulls[Anum_pg_publication_rel_prqual - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -213,11 +243,17 @@ publication_add_relation(Oid pubid, Relation targetrel, ObjectAddressSet(referenced, RelationRelationId, relid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + /* Add dependency on the objects mentioned in the qualifications */ + if (whereclause) + recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL); + + free_parsestate(pstate); + /* Close the table. */ table_close(rel, RowExclusiveLock); /* Invalidate relcache so that publication info is rebuilt. */ - CacheInvalidateRelcache(targetrel); + CacheInvalidateRelcache(targetrel->relation); return myself; } @@ -292,6 +328,62 @@ GetPublicationRelations(Oid pubid) } /* + * Gets list of PublicationRelationQuals for a publication. + */ +List * +GetPublicationRelationQuals(Oid pubid) +{ + List *result; + Relation pubrelsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the relation. */ + pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_rel_prpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId, + true, NULL, 1, &scankey); + + result = NIL; + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_rel pubrel; + PublicationRelationQual *relqual; + Datum value_datum; + char *qual_value; + Node *qual_expr; + bool isnull; + + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + + value_datum = heap_getattr(tup, Anum_pg_publication_rel_prqual, RelationGetDescr(pubrelsrel), &isnull); + if (!isnull) + { + qual_value = TextDatumGetCString(value_datum); + qual_expr = (Node *) stringToNode(qual_value); + } + else + qual_expr = NULL; + + relqual = palloc(sizeof(PublicationRelationQual)); + relqual->relation = table_open(pubrel->prrelid, ShareUpdateExclusiveLock); + relqual->whereClause = copyObject(qual_expr); + result = lappend(result, relqual); + } + + systable_endscan(scan); + heap_close(pubrelsrel, AccessShareLock); + + return result; +} + +/* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ List * diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index f115d4bf80..716ed2ec58 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -352,6 +352,27 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, Assert(list_length(stmt->tables) > 0); + /* + * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause. Use + * publication_table_list node (that accepts a WHERE clause) but forbid the + * WHERE clause in it. The use of relation_expr_list node just for the + * DROP TABLE part does not worth the trouble. + */ + if (stmt->tableAction == DEFELEM_DROP) + { + ListCell *lc; + + foreach(lc, stmt->tables) + { + PublicationTable *t = lfirst(lc); + if (t->whereClause) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use a WHERE clause for removing table from publication \"%s\"", + NameStr(pubform->pubname)))); + } + } + rels = OpenTableList(stmt->tables); if (stmt->tableAction == DEFELEM_ADD) @@ -360,47 +381,55 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { - List *oldrelids = GetPublicationRelations(pubid); + List *oldrels = GetPublicationRelationQuals(pubid); List *delrels = NIL; ListCell *oldlc; /* Calculate which relations to drop. */ - foreach(oldlc, oldrelids) + foreach(oldlc, oldrels) { - Oid oldrelid = lfirst_oid(oldlc); + PublicationRelationQual *oldrel = lfirst(oldlc); + PublicationRelationQual *newrel; ListCell *newlc; bool found = false; foreach(newlc, rels) { - Relation newrel = (Relation) lfirst(newlc); + newrel = (PublicationRelationQual *) lfirst(newlc); - if (RelationGetRelid(newrel) == oldrelid) + if (RelationGetRelid(newrel->relation) == RelationGetRelid(oldrel->relation)) { found = true; break; } } - if (!found) + /* + * Remove publication / relation mapping iif (i) table is not found in + * the new list or (ii) table is found in the new list, however, + * its qual does not match the old one (in this case, a simple + * tuple update is not enough because of the dependencies). + */ + if (!found || (found && !equal(oldrel->whereClause, newrel->whereClause))) { - Relation oldrel = table_open(oldrelid, - ShareUpdateExclusiveLock); + PublicationRelationQual *oldrelqual = palloc(sizeof(PublicationRelationQual)); + oldrelqual->relation = table_open(RelationGetRelid(oldrel->relation), + ShareUpdateExclusiveLock); - delrels = lappend(delrels, oldrel); + delrels = lappend(delrels, oldrelqual); } } /* And drop them. */ PublicationDropTables(pubid, delrels, true); + CloseTableList(oldrels); + CloseTableList(delrels); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ PublicationAddTables(pubid, rels, true, stmt); - - CloseTableList(delrels); } CloseTableList(rels); @@ -510,16 +539,18 @@ OpenTableList(List *tables) List *relids = NIL; List *rels = NIL; ListCell *lc; + PublicationRelationQual *relqual; /* * Open, share-lock, and check all the explicitly-specified relations */ foreach(lc, tables) { - RangeVar *rv = castNode(RangeVar, lfirst(lc)); - bool recurse = rv->inh; - Relation rel; - Oid myrelid; + PublicationTable *t = lfirst(lc); + RangeVar *rv = castNode(RangeVar, t->relation); + bool recurse = rv->inh; + Relation rel; + Oid myrelid; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); @@ -539,8 +570,10 @@ OpenTableList(List *tables) table_close(rel, ShareUpdateExclusiveLock); continue; } - - rels = lappend(rels, rel); + relqual = palloc(sizeof(PublicationRelationQual)); + relqual->relation = rel; + relqual->whereClause = t->whereClause; + rels = lappend(rels, relqual); relids = lappend_oid(relids, myrelid); /* Add children of this rel, if requested */ @@ -568,7 +601,11 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); - rels = lappend(rels, rel); + relqual = palloc(sizeof(PublicationRelationQual)); + relqual->relation = rel; + /* child inherits WHERE clause from parent */ + relqual->whereClause = t->whereClause; + rels = lappend(rels, relqual); relids = lappend_oid(relids, childrelid); } } @@ -589,10 +626,12 @@ CloseTableList(List *rels) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc); - table_close(rel, NoLock); + table_close(rel->relation, NoLock); } + + list_free_deep(rels); } /* @@ -608,13 +647,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); + PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc); ObjectAddress obj; /* Must be owner of the table or superuser. */ - if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), - RelationGetRelationName(rel)); + if (!pg_class_ownercheck(RelationGetRelid(rel->relation), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->relation->rd_rel->relkind), + RelationGetRelationName(rel->relation)); obj = publication_add_relation(pubid, rel, if_not_exists); if (stmt) @@ -640,8 +679,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) foreach(lc, rels) { - Relation rel = (Relation) lfirst(lc); - Oid relid = RelationGetRelid(rel); + PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc); + Oid relid = RelationGetRelid(rel->relation); prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), @@ -654,7 +693,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("relation \"%s\" is not part of the publication", - RelationGetRelationName(rel)))); + RelationGetRelationName(rel->relation)))); } ObjectAddressSet(obj, PublicationRelRelationId, prid); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 1de8f56794..bd87e80e1b 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -404,13 +404,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); relation_expr_list dostmt_opt_list transform_element_list transform_type_list TriggerTransitions TriggerReferencing - publication_name_list + publication_name_list publication_table_list vacuum_relation_list opt_vacuum_relation_list %type group_by_list %type group_by_item empty_grouping_set rollup_clause cube_clause %type grouping_sets_clause -%type opt_publication_for_tables publication_for_tables +%type opt_publication_for_tables publication_for_tables publication_table_elem %type publication_name_item %type opt_fdw_options fdw_options @@ -9518,7 +9518,7 @@ opt_publication_for_tables: ; publication_for_tables: - FOR TABLE relation_expr_list + FOR TABLE publication_table_list { $$ = (Node *) $3; } @@ -9549,7 +9549,7 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE relation_expr_list + | ALTER PUBLICATION name ADD_P TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9557,7 +9557,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE relation_expr_list + | ALTER PUBLICATION name SET TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9565,7 +9565,7 @@ AlterPublicationStmt: n->tableAction = DEFELEM_SET; $$ = (Node *)n; } - | ALTER PUBLICATION name DROP TABLE relation_expr_list + | ALTER PUBLICATION name DROP TABLE publication_table_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; @@ -9575,6 +9575,20 @@ AlterPublicationStmt: } ; +publication_table_list: + publication_table_elem { $$ = list_make1($1); } + | publication_table_list ',' publication_table_elem { $$ = lappend($1, $3); } + ; + +publication_table_elem: relation_expr OptWhereClause + { + PublicationTable *n = makeNode(PublicationTable); + n->relation = $1; + n->whereClause = $2; + $$ = (Node *) n; + } + ; + /***************************************************************************** * * CREATE SUBSCRIPTION name ... diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index f418c61545..dea5aadca7 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -544,6 +544,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr) err = _("grouping operations are not allowed in COPY FROM WHERE conditions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + if (isAgg) + err = _("aggregate functions are not allowed in publication WHERE expressions"); + else + err = _("grouping operations are not allowed in publication WHERE expressions"); + + break; /* * There is intentionally no default: case here, so that the @@ -933,6 +940,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc, case EXPR_KIND_GENERATED_COLUMN: err = _("window functions are not allowed in column generation expressions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("window functions are not allowed in publication WHERE expressions"); + break; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 76f3dd7076..6d2c6a28ea 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -170,6 +170,13 @@ transformExprRecurse(ParseState *pstate, Node *expr) /* Guard against stack overflow due to overly complex expressions */ check_stack_depth(); + /* Functions are not allowed in publication WHERE clauses */ + if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE && nodeTag(expr) == T_FuncCall) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions are not allowed in WHERE"), + parser_errposition(pstate, exprLocation(expr)))); + switch (nodeTag(expr)) { case T_ColumnRef: @@ -571,6 +578,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) case EXPR_KIND_CALL_ARGUMENT: case EXPR_KIND_COPY_WHERE: case EXPR_KIND_GENERATED_COLUMN: + case EXPR_KIND_PUBLICATION_WHERE: /* okay */ break; @@ -1924,13 +1932,15 @@ transformSubLink(ParseState *pstate, SubLink *sublink) break; case EXPR_KIND_CALL_ARGUMENT: err = _("cannot use subquery in CALL argument"); - break; case EXPR_KIND_COPY_WHERE: err = _("cannot use subquery in COPY FROM WHERE condition"); break; case EXPR_KIND_GENERATED_COLUMN: err = _("cannot use subquery in column generation expression"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("cannot use subquery in publication WHERE expression"); + break; /* * There is intentionally no default: case here, so that the @@ -3561,6 +3571,8 @@ ParseExprKindName(ParseExprKind exprKind) return "WHERE"; case EXPR_KIND_GENERATED_COLUMN: return "GENERATED AS"; + case EXPR_KIND_PUBLICATION_WHERE: + return "publication expression"; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index 8e926539e6..66458d8a48 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -2516,6 +2516,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location) case EXPR_KIND_GENERATED_COLUMN: err = _("set-returning functions are not allowed in column generation expressions"); break; + case EXPR_KIND_PUBLICATION_WHERE: + err = _("set-returning functions are not allowed in publication WHERE expressions"); + break; /* * There is intentionally no default: case here, so that the diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 42db4ada9e..5468b694f6 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -637,19 +637,26 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. This function also returns the relation + * qualifications to be used in COPY. */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, List **qual) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[2] = {OIDOID, CHAROID}; Oid attrRow[3] = {TEXTOID, OIDOID, BOOLOID}; + Oid qualRow[1] = {TEXTOID}; bool isnull; - int natt; + int n; + ListCell *lc; + bool first; + + /* Avoid trashing relation map cache */ + memset(lrel, 0, sizeof(LogicalRepRelation)); lrel->nspname = nspname; lrel->relname = relname; @@ -713,20 +720,20 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->atttyps = palloc0(res->ntuples * sizeof(Oid)); lrel->attkeys = NULL; - natt = 0; + n = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = + lrel->attnames[n] = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); + lrel->atttyps[n] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); Assert(!isnull); if (DatumGetBool(slot_getattr(slot, 3, &isnull))) - lrel->attkeys = bms_add_member(lrel->attkeys, natt); + lrel->attkeys = bms_add_member(lrel->attkeys, n); /* Should never happen. */ - if (++natt >= MaxTupleAttributeNumber) + if (++n >= MaxTupleAttributeNumber) elog(ERROR, "too many columns in remote table \"%s.%s\"", nspname, relname); @@ -734,7 +741,46 @@ fetch_remote_table_info(char *nspname, char *relname, } ExecDropSingleTupleTableSlot(slot); - lrel->natts = natt; + lrel->natts = n; + + walrcv_clear_result(res); + + /* Get relation qual */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT pg_get_expr(prqual, prrelid) FROM pg_publication p INNER JOIN pg_publication_rel pr ON (p.oid = pr.prpubid) WHERE pr.prrelid = %u AND p.pubname IN (", lrel->remoteid); + + first = true; + foreach(lc, MySubscription->publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + } + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 1, qualRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Datum rf = slot_getattr(slot, 1, &isnull); + + if (!isnull) + *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); pfree(cmd.data); @@ -750,6 +796,7 @@ copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; + List *qual = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyState cstate; @@ -758,7 +805,7 @@ copy_table(Relation rel) /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, &qual); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -767,10 +814,57 @@ copy_table(Relation rel) relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + /* list of columns for COPY */ + attnamelist = make_copy_attnamelist(relmapentry); + /* Start copy on the publisher. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "COPY %s TO STDOUT", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + /* + * If publication has any row filter, build a SELECT query with OR'ed row + * filters for COPY. If no row filters are available, use COPY for all + * table contents. + */ + if (list_length(qual) > 0) + { + ListCell *lc; + bool first; + + appendStringInfoString(&cmd, "COPY (SELECT "); + /* list of attribute names */ + first = true; + foreach(lc, attnamelist) + { + char *col = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + appendStringInfo(&cmd, "%s", quote_identifier(col)); + } + appendStringInfo(&cmd, " FROM %s", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + appendStringInfoString(&cmd, " WHERE "); + /* list of OR'ed filters */ + first = true; + foreach(lc, qual) + { + char *q = strVal(lfirst(lc)); + if (first) + first = false; + else + appendStringInfoString(&cmd, " OR "); + appendStringInfo(&cmd, "%s", q); + } + + appendStringInfoString(&cmd, ") TO STDOUT"); + list_free_deep(qual); + } + else + { + appendStringInfo(&cmd, "COPY %s TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + } res = walrcv_exec(wrconn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) @@ -785,7 +879,6 @@ copy_table(Relation rel) addRangeTableEntryForRelation(pstate, rel, AccessShareLock, NULL, false, false); - attnamelist = make_copy_attnamelist(relmapentry); cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL); /* Do the copy */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d9952c8b7e..cef0c52955 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -172,7 +172,7 @@ ensure_transaction(void) * * This is based on similar code in copy.c */ -static EState * +EState * create_estate_for_relation(Relation rel) { EState *estate; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9c08757fca..83fad465f8 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -12,15 +12,26 @@ */ #include "postgres.h" +#include "catalog/pg_type.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel.h" + +#include "executor/executor.h" +#include "nodes/execnodes.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/planner.h" +#include "optimizer/optimizer.h" +#include "parser/parse_coerce.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/inval.h" #include "utils/int8.h" #include "utils/memutils.h" @@ -60,6 +71,7 @@ typedef struct RelationSyncEntry bool schema_sent; /* did we send the schema? */ bool replicate_valid; PublicationActions pubactions; + List *qual; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -335,6 +347,65 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* ... then check row filter */ + if (list_length(relentry->qual) > 0) + { + HeapTuple old_tuple; + HeapTuple new_tuple; + TupleDesc tupdesc; + EState *estate; + ExprContext *ecxt; + MemoryContext oldcxt; + ListCell *lc; + bool matched = true; + + old_tuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; + new_tuple = change->data.tp.newtuple ? &change->data.tp.newtuple->tuple : NULL; + tupdesc = RelationGetDescr(relation); + estate = create_estate_for_relation(relation); + + /* prepare context per tuple */ + ecxt = GetPerTupleExprContext(estate); + oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); + ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc, &TTSOpsVirtual); + + ExecStoreHeapTuple(new_tuple ? new_tuple : old_tuple, ecxt->ecxt_scantuple, false); + + foreach (lc, relentry->qual) + { + Node *qual; + ExprState *expr_state; + Expr *expr; + Oid expr_type; + Datum res; + bool isnull; + + qual = (Node *) lfirst(lc); + + /* evaluates row filter */ + expr_type = exprType(qual); + expr = (Expr *) coerce_to_target_type(NULL, qual, expr_type, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1); + expr = expression_planner(expr); + expr_state = ExecInitExpr(expr, NULL); + res = ExecEvalExpr(expr_state, ecxt, &isnull); + + /* if tuple does not match row filter, bail out */ + if (!DatumGetBool(res) || isnull) + { + matched = false; + break; + } + } + + MemoryContextSwitchTo(oldcxt); + + ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple); + FreeExecutorState(estate); + + if (!matched) + return; + } + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -570,10 +641,14 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) */ entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->qual = NIL; foreach(lc, data->publications) { Publication *pub = lfirst(lc); + HeapTuple rf_tuple; + Datum rf_datum; + bool rf_isnull; if (pub->alltables || list_member_oid(pubids, pub->oid)) { @@ -583,9 +658,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; + /* Cache row filters, if available */ + rf_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(rf_tuple)) + { + rf_datum = SysCacheGetAttr(PUBLICATIONRELMAP, rf_tuple, Anum_pg_publication_rel_prqual, &rf_isnull); + + if (!rf_isnull) + { + MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext); + char *s = TextDatumGetCString(rf_datum); + Node *rf_node = stringToNode(s); + entry->qual = lappend(entry->qual, rf_node); + MemoryContextSwitchTo(oldctx); + } + + ReleaseSysCache(rf_tuple); + } } list_free(pubids); @@ -660,5 +749,10 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) */ hash_seq_init(&status, RelationSyncCache); while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) + { entry->replicate_valid = false; + if (list_length(entry->qual) > 0) + list_free_deep(entry->qual); + entry->qual = NIL; + } } diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 20a2f0ac1b..ae8d2845a0 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -78,15 +78,22 @@ typedef struct Publication PublicationActions pubactions; } Publication; +typedef struct PublicationRelationQual +{ + Relation relation; + Node *whereClause; +} PublicationRelationQual; + extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); extern List *GetPublicationRelations(Oid pubid); +extern List *GetPublicationRelationQuals(Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, +extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationQual *targetrel, bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index 5f5bc92ab3..a75b2d5345 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -28,9 +28,13 @@ */ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) { - Oid oid; /* oid */ - Oid prpubid; /* Oid of the publication */ - Oid prrelid; /* Oid of the relation */ + Oid oid; /* oid */ + Oid prpubid; /* Oid of the publication */ + Oid prrelid; /* Oid of the relation */ + +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + pg_node_tree prqual; /* qualifications */ +#endif } FormData_pg_publication_rel; /* ---------------- diff --git a/src/include/catalog/toasting.h b/src/include/catalog/toasting.h index cc5dfed0bf..d57ca82e89 100644 --- a/src/include/catalog/toasting.h +++ b/src/include/catalog/toasting.h @@ -66,6 +66,7 @@ DECLARE_TOAST(pg_namespace, 4163, 4164); DECLARE_TOAST(pg_partitioned_table, 4165, 4166); DECLARE_TOAST(pg_policy, 4167, 4168); DECLARE_TOAST(pg_proc, 2836, 2837); +DECLARE_TOAST(pg_publication_rel, 8287, 8288); DECLARE_TOAST(pg_rewrite, 2838, 2839); DECLARE_TOAST(pg_seclabel, 3598, 3599); DECLARE_TOAST(pg_statistic, 2840, 2841); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 3cbb08df92..7f83da1ee8 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -476,6 +476,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_PublicationTable, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 94ded3c135..359f773092 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3461,12 +3461,19 @@ typedef struct AlterTSConfigurationStmt } AlterTSConfigurationStmt; +typedef struct PublicationTable +{ + NodeTag type; + RangeVar *relation; /* relation to be published */ + Node *whereClause; /* qualifications */ +} PublicationTable; + typedef struct CreatePublicationStmt { NodeTag type; char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ - List *tables; /* Optional list of tables to add */ + List *tables; /* Optional list of PublicationTable to add */ bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; @@ -3479,7 +3486,7 @@ typedef struct AlterPublicationStmt List *options; /* List of DefElem nodes */ /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ - List *tables; /* List of tables to add/drop */ + List *tables; /* List of PublicationTable to add/drop */ bool for_all_tables; /* Special publication for all tables in db */ DefElemAction tableAction; /* What action to perform with the tables */ } AlterPublicationStmt; diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index 7c099e7084..c2e8b9fcb9 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -73,6 +73,7 @@ typedef enum ParseExprKind EXPR_KIND_CALL_ARGUMENT, /* procedure argument in CALL */ EXPR_KIND_COPY_WHERE, /* WHERE condition in COPY FROM */ EXPR_KIND_GENERATED_COLUMN, /* generation expression for a column */ + EXPR_KIND_PUBLICATION_WHERE /* WHERE condition for a table in PUBLICATION */ } ParseExprKind; diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 2642a3f94e..5cc307ee0e 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -39,4 +39,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp); extern char *logicalrep_typmap_gettypname(Oid remoteid); +extern EState *create_estate_for_relation(Relation rel); + #endif /* LOGICALRELATION_H */ diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index feb51e4add..202173c376 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -116,6 +116,35 @@ Tables: DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +CREATE TABLE testpub_rf_tbl1 (a integer, b text); +CREATE TABLE testpub_rf_tbl2 (c text, d integer); +CREATE TABLE testpub_rf_tbl3 (e integer); +CREATE TABLE testpub_rf_tbl4 (g text); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5); +RESET client_min_messages; +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; +-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) +ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); +-- fail - functions disallowed +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); +ERROR: functions are not allowed in WHERE +LINE 1: ...ICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) ... + ^ +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t +Tables: + "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500)) + +DROP TABLE testpub_rf_tbl1; +DROP TABLE testpub_rf_tbl2; +DROP TABLE testpub_rf_tbl3; +DROP TABLE testpub_rf_tbl4; +DROP PUBLICATION testpub5; -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; ERROR: "testpub_view" is not a table diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 5773a755cf..6f0d088984 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -69,6 +69,27 @@ RESET client_min_messages; DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +CREATE TABLE testpub_rf_tbl1 (a integer, b text); +CREATE TABLE testpub_rf_tbl2 (c text, d integer); +CREATE TABLE testpub_rf_tbl3 (e integer); +CREATE TABLE testpub_rf_tbl4 (g text); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5); +RESET client_min_messages; +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000); +ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2; +-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression) +ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500); +-- fail - functions disallowed +ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6); +\dRp+ testpub5 + +DROP TABLE testpub_rf_tbl1; +DROP TABLE testpub_rf_tbl2; +DROP TABLE testpub_rf_tbl3; +DROP TABLE testpub_rf_tbl4; +DROP PUBLICATION testpub5; + -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; SET client_min_messages = 'ERROR'; diff --git a/src/test/subscription/t/013_row_filter.pl b/src/test/subscription/t/013_row_filter.pl new file mode 100644 index 0000000000..99e6db94d6 --- /dev/null +++ b/src/test/subscription/t/013_row_filter.pl @@ -0,0 +1,96 @@ +# Test logical replication behavior with row filtering +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_1 (a int primary key, b text)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_2 (c int primary key)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); + +# setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_1 (a int primary key, b text)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_2 (c int primary key)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)"); + +# setup logical replication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')"); + +my $result = $node_publisher->psql('postgres', + "ALTER PUBLICATION tap_pub_1 DROP TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')"); +is($result, 3, "syntax error for ALTER PUBLICATION DROP TABLE"); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 7 = 0)"); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)"); + +# test row filtering +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1002) x"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 10)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x"); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2" +); + +$node_publisher->wait_for_catchup($appname); + +# wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +#$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab_rowfilter_1"); +is($result, qq(1980|not filtered +1001|test 1001 +1002|test 1002), 'check filtered data was copied to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(c), min(c), max(c) FROM tab_rowfilter_2"); +is($result, qq(7|2|10), 'check filtered data was copied to subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(a) FROM tab_rowfilter_3"); +is($result, qq(10), 'check filtered data was copied to subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.11.0