From 1cb19ad525fc6903f9b55568c16254dca3c6736c Mon Sep 17 00:00:00 2001 From: wangw Date: Tue, 14 Jun 2022 11:23:52 +0800 Subject: [PATCH v19 3/4] Add some checks before using apply background worker to apply changes. streaming=parallel mode has two requirements: 1) The unique column in the relation on the subscriber-side should also be the unique column on the publisher-side; 2) There cannot be any non-immutable functions used by the subscriber-side replicated table. Look for functions in the following places: * a. Trigger functions * b. Column default value expressions and domain constraints * c. Constraint expressions * d. Foreign keys --- doc/src/sgml/ref/create_subscription.sgml | 4 + .../replication/logical/applybgworker.c | 44 ++ src/backend/replication/logical/proto.c | 88 +++- src/backend/replication/logical/relation.c | 201 +++++++++ src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/logical/worker.c | 23 +- src/backend/utils/cache/typcache.c | 17 + src/include/replication/logicalproto.h | 1 + src/include/replication/logicalrelation.h | 15 + src/include/replication/worker_internal.h | 1 + src/include/utils/typcache.h | 2 + .../subscription/t/022_twophase_cascade.pl | 6 + .../subscription/t/032_streaming_apply.pl | 380 ++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 14 files changed, 775 insertions(+), 9 deletions(-) create mode 100644 src/test/subscription/t/032_streaming_apply.pl diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index b08e4b5580..832899570f 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -240,6 +240,10 @@ CREATE SUBSCRIPTION subscription_nameparallel mode has two requirements: 1) the unique + column in the relation on the subscriber-side should also be the + unique column on the publisher-side; 2) there cannot be any + non-immutable functions used by the subscriber-side replicated table. diff --git a/src/backend/replication/logical/applybgworker.c b/src/backend/replication/logical/applybgworker.c index aa222490a0..89c712f785 100644 --- a/src/backend/replication/logical/applybgworker.c +++ b/src/backend/replication/logical/applybgworker.c @@ -800,3 +800,47 @@ apply_bgworker_subxact_info_add(TransactionId current_xid) MemoryContextSwitchTo(oldctx); } } + +/* + * Check if changes on this relation can be applied by an apply background + * worker. + * + * Although the commit order is maintained only allowing one process to commit + * at a time, the access order to the relation has changed. This could cause + * unexpected problems if the unique column on the replicated table is + * inconsistent with the publisher-side or contains non-immutable functions + * when applying transactions in the apply background worker. + */ +void +apply_bgworker_relation_check(LogicalRepRelMapEntry *rel) +{ + /* Skip check if not an apply background worker. */ + if (!am_apply_bgworker()) + return; + + /* + * Partition table checks are done later in function + * apply_handle_tuple_routing. + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + + /* + * Return if changes on this relation can be applied by an apply background + * worker. + */ + if (rel->parallel_apply == PARALLEL_APPLY_SAFE) + return; + + /* We are in error mode and should give user correct error. */ + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot replicate target relation \"%s.%s\" using " + "subscription parameter streaming=parallel", + rel->remoterel.nspname, rel->remoterel.relname), + errdetail("The unique column on subscriber is not the unique " + "column on publisher or there is at least one " + "non-immutable function."), + errhint("Please change to use subscription parameter " + "streaming=on."))); +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 47bd811fb7..511bd9c052 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -23,7 +23,8 @@ /* * Protocol message flags. */ -#define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define ATTR_IS_REPLICA_IDENTITY (1 << 0) +#define ATTR_IS_UNIQUE (1 << 1) #define MESSAGE_TRANSACTIONAL (1<<0) #define TRUNCATE_CASCADE (1<<0) @@ -40,6 +41,68 @@ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); static void logicalrep_write_namespace(StringInfo out, Oid nspid); static const char *logicalrep_read_namespace(StringInfo in); +static Bitmapset *RelationGetUniqueKeyBitmap(Relation rel); + +/* + * RelationGetUniqueKeyBitmap -- get a bitmap of unique attribute numbers + * + * This is similar to RelationGetIdentityKeyBitmap(), but returns a bitmap of + * index attribute numbers for all unique indexes. + */ +static Bitmapset * +RelationGetUniqueKeyBitmap(Relation rel) +{ + List *indexoidlist = NIL; + ListCell *indexoidscan; + Bitmapset *attunique = NULL; + + if (!rel->rd_rel->relhasindex) + return NULL; + + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + Relation indexRel; + int i; + + /* Look up the description for index */ + indexRel = RelationIdGetRelation(indexoid); + + if (!RelationIsValid(indexRel)) + elog(ERROR, "could not open relation with OID %u", indexoid); + + if (!indexRel->rd_index->indisunique) + { + RelationClose(indexRel); + continue; + } + + /* Add referenced attributes to idindexattrs */ + for (i = 0; i < indexRel->rd_index->indnatts; i++) + { + int attrnum = indexRel->rd_index->indkey.values[i]; + + /* + * We don't include non-key columns into idindexattrs + * bitmaps. See RelationGetIndexAttrBitmap. + */ + if (attrnum != 0) + { + if (i < indexRel->rd_index->indnkeyatts && + !bms_is_member(attrnum - FirstLowInvalidHeapAttributeNumber, attunique)) + attunique = bms_add_member(attunique, + attrnum - FirstLowInvalidHeapAttributeNumber); + } + } + RelationClose(indexRel); + } + list_free(indexoidlist); + + return attunique; +} + /* * Check if a column is covered by a column list. * @@ -933,7 +996,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) TupleDesc desc; int i; uint16 nliveatts = 0; - Bitmapset *idattrs = NULL; + Bitmapset *idattrs = NULL, + *attunique = NULL; bool replidentfull; desc = RelationGetDescr(rel); @@ -958,6 +1022,9 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) if (!replidentfull) idattrs = RelationGetIdentityKeyBitmap(rel); + /* fetch bitmap of UNIQUE attributes */ + attunique = RelationGetUniqueKeyBitmap(rel); + /* send the attributes */ for (i = 0; i < desc->natts; i++) { @@ -974,7 +1041,11 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs)) - flags |= LOGICALREP_IS_REPLICA_IDENTITY; + flags |= ATTR_IS_REPLICA_IDENTITY; + + if (bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + attunique)) + flags |= ATTR_IS_UNIQUE; pq_sendbyte(out, flags); @@ -989,6 +1060,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) } bms_free(idattrs); + bms_free(attunique); } /* @@ -1001,7 +1073,8 @@ logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) int natts; char **attnames; Oid *atttyps; - Bitmapset *attkeys = NULL; + Bitmapset *attkeys = NULL, + *attunique = NULL; natts = pq_getmsgint(in, 2); attnames = palloc(natts * sizeof(char *)); @@ -1014,9 +1087,13 @@ logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) /* Check for replica identity column */ flags = pq_getmsgbyte(in); - if (flags & LOGICALREP_IS_REPLICA_IDENTITY) + if (flags & ATTR_IS_REPLICA_IDENTITY) attkeys = bms_add_member(attkeys, i); + /* Check for unique column */ + if (flags & ATTR_IS_UNIQUE) + attunique = bms_add_member(attunique, i); + /* attribute name */ attnames[i] = pstrdup(pq_getmsgstring(in)); @@ -1030,6 +1107,7 @@ logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) rel->attnames = attnames; rel->atttyps = atttyps; rel->attkeys = attkeys; + rel->attunique = attunique; rel->natts = natts; } diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index e989047681..37e410b8d0 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -19,12 +19,19 @@ #include "access/table.h" #include "catalog/namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_subscription_rel.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" #include "replication/logicalrelation.h" #include "replication/worker_internal.h" +#include "rewrite/rewriteHandler.h" #include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" +#include "utils/typcache.h" static MemoryContext LogicalRepRelMapContext = NULL; @@ -91,6 +98,26 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) } } +/* + * Relcache invalidation callback to reset parallel flag. + */ +static void +logicalrep_relmap_reset_parallel_cb(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS hash_seq; + LogicalRepRelMapEntry *entry; + + if (LogicalRepRelMap == NULL) + return; + + hash_seq_init(&hash_seq, LogicalRepRelMap); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + entry->parallel_apply = PARALLEL_APPLY_UNKNOWN; + entry->localrelvalid = false; + } +} + /* * Initialize the relation map cache. */ @@ -116,6 +143,9 @@ logicalrep_relmap_init(void) /* Watch for invalidation events. */ CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PROCOID, + logicalrep_relmap_reset_parallel_cb, + (Datum) 0); } /* @@ -142,6 +172,7 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) pfree(remoterel->atttyps); } bms_free(remoterel->attkeys); + bms_free(remoterel->attunique); if (entry->attrmap) free_attrmap(entry->attrmap); @@ -190,6 +221,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) } entry->remoterel.replident = remoterel->replident; entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + entry->remoterel.attunique = bms_copy(remoterel->attunique); MemoryContextSwitchTo(oldctx); } @@ -310,6 +342,168 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) } } +/* + * Check if changes on one relation can be applied by an apply background + * worker and assign the 'parallel_apply' flag. + * + * There are two requirements for applying changes in an apply background + * worker: 1) The unique column in the relation on the subscriber-side should + * also be the unique column on the publisher-side; 2) There cannot be any + * non-immutable functions used by the subscriber-side. + * + * We just mark the relation entry as 'PARALLEL_APPLY_UNSAFE' here if changes + * on one relation can not be applied by an apply background worker and leave + * it to apply_bgworker_relation_check() to throw the actual error if needed. + */ +static void +logicalrep_rel_mark_parallel_apply(LogicalRepRelMapEntry *entry) +{ + Bitmapset *ukey; + int i; + TupleDesc tupdesc; + int attnum; + List *fkeys = NIL; + + /* Fast path if 'parallel_apply' flag is already known. */ + if (entry->parallel_apply != PARALLEL_APPLY_UNKNOWN) + return; + + /* Initialize the flag. */ + entry->parallel_apply = PARALLEL_APPLY_SAFE; + + /* + * First, check if the unique column in the relation on the subscriber-side + * is also the unique column on the publisher-side. + */ + ukey = RelationGetIndexAttrBitmap(entry->localrel, + INDEX_ATTR_BITMAP_KEY); + + if (ukey) + { + i = -1; + while ((i = bms_next_member(ukey, i)) >= 0) + { + attnum = AttrNumberGetAttrOffset(i + FirstLowInvalidHeapAttributeNumber); + + if (entry->attrmap->attnums[attnum] < 0 || + !bms_is_member(entry->attrmap->attnums[attnum], entry->remoterel.attunique)) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + + bms_free(ukey); + } + + /* + * Then, check if there is any non-immutable function used by the local + * table. Look for functions in the following places: + * a. trigger functions; + * b. Column default value expressions and domain constraints; + * c. Constraint expressions; + * d. Foreign keys. + */ + /* Check the trigger functions. */ + if (entry->localrel->trigdesc != NULL) + { + for (i = 0; i < entry->localrel->trigdesc->numtriggers; i++) + { + Trigger *trig = entry->localrel->trigdesc->triggers + i; + + if (trig->tgenabled != TRIGGER_FIRES_ALWAYS && + trig->tgenabled != TRIGGER_FIRES_ON_REPLICA) + continue; + + if (func_volatile(trig->tgfoid) != PROVOLATILE_IMMUTABLE) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + } + + /* Check the columns. */ + tupdesc = RelationGetDescr(entry->localrel); + for (attnum = 0; attnum < tupdesc->natts; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); + + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; + + /* + * We don't need to check columns that only exist on the + * subscriber + */ + if (entry->attrmap->attnums[attnum] < 0) + continue; + + if (att->atthasdef) + { + Node *defaultexpr; + + defaultexpr = build_column_default(entry->localrel, attnum + 1); + if (contain_mutable_functions(defaultexpr)) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + + /* + * If the column is of a DOMAIN type, determine whether + * that domain has any CHECK expressions that are not + * immutable. + */ + if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) + { + List *domain_constraints; + ListCell *lc; + + domain_constraints = GetDomainConstraints(att->atttypid); + + foreach(lc, domain_constraints) + { + DomainConstraintState *con = (DomainConstraintState *) lfirst(lc); + + if (con->check_expr && contain_mutable_functions((Node *) con->check_expr)) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + } + } + + /* Check the constraints. */ + if (tupdesc->constr) + { + ConstrCheck *check = tupdesc->constr->check; + + /* + * Determine if there are any CHECK constraints which + * contains non-immutable function. + */ + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *check_expr = stringToNode(check[i].ccbin); + + if (contain_mutable_functions((Node *) check_expr)) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + } + + /* Check the foreign keys. */ + fkeys = RelationGetFKeyList(entry->localrel); + if (fkeys) + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; +} + /* * Open the local relation associated with the remote one. * @@ -438,6 +632,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) */ logicalrep_rel_mark_updatable(entry); + /* Set if changes could be applied in the apply background worker. */ + logicalrep_rel_mark_parallel_apply(entry); + entry->localrelvalid = true; } @@ -653,6 +850,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, } entry->remoterel.replident = remoterel->replident; entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + entry->remoterel.attunique = bms_copy(remoterel->attunique); } entry->localrel = partrel; @@ -696,6 +894,9 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, /* Set if the table's replica identity is enough to apply update/delete. */ logicalrep_rel_mark_updatable(entry); + /* Set if changes could be applied in the apply background worker. */ + logicalrep_rel_mark_parallel_apply(entry); + entry->localrelvalid = true; /* state and statelsn are left set to 0. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 8ffba7e2e5..3cdbf8b457 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -884,6 +884,7 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; + lrel->attunique = NULL; /* * Store the columns as a list of names. Ignore those that are not diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2aa7797628..41793d26c2 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1391,6 +1391,14 @@ apply_handle_stream_stop(StringInfo s) { char action = LOGICAL_REP_MSG_STREAM_STOP; + /* + * Unlike stream_commit, we don't need to wait here for stream_stop to + * finish. Allowing the other transaction to be applied before + * stream_stop is finished can lead to failures if the unique + * index/constraint is different between publisher and subscriber. But + * for such cases, we don't allow streamed transactions to be applied + * in parallel. See apply_bgworker_relation_check. + */ apply_bgworker_send_data(stream_apply_worker, 1, &action); elog(DEBUG1, "stopped streaming of xid %u, %u changes streamed", stream_xid, nchanges); @@ -2044,6 +2052,8 @@ apply_handle_insert(StringInfo s) /* Set relation for error callback */ apply_error_callback_arg.rel = rel; + apply_bgworker_relation_check(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2187,6 +2197,8 @@ apply_handle_update(StringInfo s) /* Check if we can do the update. */ check_relation_updatable(rel); + apply_bgworker_relation_check(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2355,6 +2367,8 @@ apply_handle_delete(StringInfo s) /* Check if we can do the delete. */ check_relation_updatable(rel); + apply_bgworker_relation_check(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2540,13 +2554,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, } MemoryContextSwitchTo(oldctx); + part_entry = logicalrep_partition_open(relmapentry, partrel, + attrmap); + /* Check if we can do the update or delete on the leaf partition. */ if (operation == CMD_UPDATE || operation == CMD_DELETE) - { - part_entry = logicalrep_partition_open(relmapentry, partrel, - attrmap); check_relation_updatable(part_entry); - } + + apply_bgworker_relation_check(part_entry); switch (operation) { diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c index 808f9ebd0d..b248899d82 100644 --- a/src/backend/utils/cache/typcache.c +++ b/src/backend/utils/cache/typcache.c @@ -2540,6 +2540,23 @@ compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2) return 0; } +/* + * GetDomainConstraints --- get DomainConstraintState list of specified domain type + */ +List * +GetDomainConstraints(Oid type_id) +{ + TypeCacheEntry *typentry; + List *constraints = NIL; + + typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO); + + if(typentry->domainData != NULL) + constraints = typentry->domainData->constraints; + + return constraints; +} + /* * Load (or re-load) the enumData member of the typcache entry. */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index eb0fd24fd8..4395c11f75 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -113,6 +113,7 @@ typedef struct LogicalRepRelation char replident; /* replica identity */ char relkind; /* remote relation kind */ Bitmapset *attkeys; /* Bitmap of key columns */ + Bitmapset *attunique; /* Bitmap of unique columns */ } LogicalRepRelation; /* Type mapping info */ diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 78cd7e77f5..8011e648d7 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -15,6 +15,19 @@ #include "access/attmap.h" #include "replication/logicalproto.h" +/* + * States to determine if changes on one relation can be applied using an + * apply background worker. + */ +typedef enum ParalleApplySafety +{ + PARALLEL_APPLY_UNKNOWN = 0, /* unknown */ + PARALLEL_APPLY_SAFE, /* Can apply changes in an apply background + worker */ + PARALLEL_APPLY_UNSAFE /* Can not apply changes in an apply background + worker */ +} ParalleApplySafety; + typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ @@ -31,6 +44,8 @@ typedef struct LogicalRepRelMapEntry Relation localrel; /* relcache entry (NULL when closed) */ AttrMap *attrmap; /* map of local attributes to remote ones */ bool updatable; /* Can apply updates/deletes? */ + ParalleApplySafety parallel_apply; /* Can apply changes in an apply + background worker? */ /* Sync state. */ char state; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index a3560d4904..1c0db05c8a 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -194,6 +194,7 @@ extern void apply_bgworker_free(ApplyBgworkerState *wstate); extern void apply_bgworker_check_status(void); extern void apply_bgworker_set_status(ApplyBgworkerStatus status); extern void apply_bgworker_subxact_info_add(TransactionId current_xid); +extern void apply_bgworker_relation_check(LogicalRepRelMapEntry *rel); static inline bool am_tablesync_worker(void) diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h index 431ad7f1b3..ed7c2e7f48 100644 --- a/src/include/utils/typcache.h +++ b/src/include/utils/typcache.h @@ -199,6 +199,8 @@ extern uint64 assign_record_type_identifier(Oid type_id, int32 typmod); extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2); +extern List *GetDomainConstraints(Oid type_id); + extern size_t SharedRecordTypmodRegistryEstimate(void); extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *, diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl index 0a4152d3be..30a01f7305 100644 --- a/src/test/subscription/t/022_twophase_cascade.pl +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -39,6 +39,12 @@ sub test_streaming ALTER SUBSCRIPTION tap_sub_C SET (streaming = $streaming_mode)"); + if ($streaming_mode eq 'parallel') + { + $node_C->safe_psql( + 'postgres', "ALTER TABLE test_tab ALTER c DROP DEFAULT"); + } + # Wait for subscribers to finish initialization $node_A->poll_query_until( diff --git a/src/test/subscription/t/032_streaming_apply.pl b/src/test/subscription/t/032_streaming_apply.pl new file mode 100644 index 0000000000..7f8bfa6745 --- /dev/null +++ b/src/test/subscription/t/032_streaming_apply.pl @@ -0,0 +1,380 @@ +# Copyright (c) 2022, PostgreSQL Global Development Group + +# Test the restrictions of streaming mode "parallel" in logical replication + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $offset = 0; + +# Create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +# Setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab_partitioned (a int primary key, b varchar)"); + +# Setup structure on subscriber +# We need to test normal table and partition table. +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab_partitioned (a int primary key, b varchar) PARTITION BY RANGE(a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab_partition (LIKE test_tab_partitioned)"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partitioned ATTACH PARTITION test_tab_partition DEFAULT" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_partitioned FOR TABLE test_tab_partitioned"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub, tap_pub_partitioned + WITH (streaming = parallel, copy_data = false)"); + +$node_publisher->wait_for_catchup($appname); + +# It is not allowed that the unique index on the publisher and the subscriber +# is different. Check the error reported by background worker in this case. +# First we check the unique index on normal table. +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX test_tab_b_idx ON test_tab (b)"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" using subscription parameter streaming=parallel/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_idx"); + +$node_publisher->wait_for_catchup($appname); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), 'data replicated to subscriber after dropping index'); + +# Then we check the unique index on partition table. +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX test_tab_b_partition_idx ON test_tab_partition (b)"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_partitioned SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(5000), 'data replicated to subscriber after dropping index'); + +# Triggers which execute non-immutable function are not allowed on the +# subscriber side. Check the error reported by background worker in this case. +# First we check the trigger function on normal table. +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE FUNCTION trigger_func() RETURNS TRIGGER AS \$\$ + BEGIN + RETURN NULL; + END +\$\$ language plpgsql; +CREATE TRIGGER insert_trig +BEFORE INSERT ON test_tab +FOR EACH ROW EXECUTE PROCEDURE trigger_func(); +ALTER TABLE test_tab ENABLE REPLICA TRIGGER insert_trig; +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab"); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" using subscription parameter streaming=parallel/, + $offset); + +# Drop the trigger on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(0), 'data replicated to subscriber after dropping trigger'); + +# Then we check the trigger function on partition table. +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE TRIGGER insert_trig +BEFORE INSERT ON test_tab_partition +FOR EACH ROW EXECUTE PROCEDURE trigger_func(); +ALTER TABLE test_tab_partition ENABLE REPLICA TRIGGER insert_trig; +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab_partitioned"); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop the trigger on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab_partition"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(0), 'data replicated to subscriber after dropping trigger'); + +# It is not allowed that column default value expression contains a +# non-immutable function on the subscriber side. Check the error reported by +# background worker in this case. +# First we check the column default value expression on normal table. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN b SET DEFAULT random()"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" using subscription parameter streaming=parallel/, + $offset); + +# Drop default value on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN b DROP DEFAULT"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), + 'data replicated to subscriber after dropping default value expression'); + +# Then we check the column default value expression on partition table. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b SET DEFAULT random()"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_partitioned SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop default value on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(5000), + 'data replicated to subscriber after dropping default value expression'); + +# It is not allowed that domain constraint expression contains a non-immutable +# function on the subscriber side. Check the error reported by background +# worker in this case. +# Because the column type of the partition table must be the same as its parent +# table, only test normal table here. +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE DOMAIN test_domain AS int CHECK(VALUE > random()); +ALTER TABLE test_tab ALTER COLUMN a TYPE test_domain; +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab"); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" using subscription parameter streaming=parallel/, + $offset); + +# Drop domain constraint expression on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN a TYPE int"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(0), + 'data replicated to subscriber after dropping domain constraint expression' +); + +# It is not allowed that constraint expression contains a non-immutable function +# on the subscriber side. Check the error reported by background worker in this +# case. +# First we check the constraint expression on normal table. +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab ADD CONSTRAINT test_tab_con check (a > random()); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" using subscription parameter streaming=parallel/, + $offset); + +# Drop constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab DROP CONSTRAINT test_tab_con"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), + 'data replicated to subscriber after dropping constraint expression'); + +# Then we check the constraint expression on partition table. +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab_partition ADD CONSTRAINT test_tab_con check (a > random()); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab_partitioned"); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(0), + 'data replicated to subscriber after dropping constraint expression'); + +# It is not allowed that foreign key on the subscriber side. Check the error +# reported by background worker in this case. +# First we check the foreign key on normal table. +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab"); +$node_publisher->wait_for_catchup($appname); +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE TABLE test_tab_f (a int primary key); +ALTER TABLE test_tab ADD CONSTRAINT test_tabfk FOREIGN KEY(a) REFERENCES test_tab_f(a); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" using subscription parameter streaming=parallel/, + $offset); + +# Drop the foreign key constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab DROP CONSTRAINT test_tabfk"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), + 'data replicated to subscriber after dropping the foreign key'); + +# Then we check the foreign key on partition table. +$node_publisher->wait_for_catchup($appname); +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE TABLE test_tab_partition_f (a int primary key); +ALTER TABLE test_tab_partition ADD CONSTRAINT test_tab_patition_fk FOREIGN KEY(a) REFERENCES test_tab_partition_f(a); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_partitioned SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop the foreign key constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(5000), + 'data replicated to subscriber after dropping the foreign key'); + +$node_subscriber->stop; +$node_publisher->stop; + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 4137dc77b4..697e6a7ba3 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1891,6 +1891,7 @@ PageXLogRecPtr PagetableEntry Pairs ParallelAppendState +ParallelApplySafety ParallelBitmapHeapState ParallelBlockTableScanDesc ParallelBlockTableScanWorker -- 2.23.0.windows.1