From 56f75382e01bbf8cb6572b7302c4395f761b12d7 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Thu, 2 Jun 2022 17:39:37 +0300 Subject: [PATCH v17 5/5] Reuse Replication Slot and Origin in Tablesync This commit allows reusing replication slots and origins during tablesync. Earlier, a tablesync worker was creating a new replication slot and origin each time it syncs a new table. With this patch, replication slots/origins can be reusable for tablesync. This reduces the overhead of creating/dropping replication slots and origins and improves tablesync speed significantly especially for empty or small tables. If the state of the current table is INIT or DATASYNC, tablesync worker needs a repliation slot/origin. If the worker has not created slot and origin in its previous runs, it will create those first. Otherwise the worker reuses slot and origin created by the same worker in previous iterations earlier. Tables in FINISHEDCOPY are expected to have a replication slot and origin. Slot and origin names for such tables are persisted in pg_subscription_rel catalog. Tablesync worker can fetch them and proceed with existing slot and origin of FINISHEDCOPY tables and does not need to create new ones. Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 31 ++ src/backend/catalog/pg_subscription.c | 247 +++++++++++- src/backend/commands/subscriptioncmds.c | 229 +++++++---- .../replication/logical/applyparallelworker.c | 3 +- src/backend/replication/logical/launcher.c | 8 +- src/backend/replication/logical/tablesync.c | 373 +++++++++++++----- src/backend/replication/logical/worker.c | 57 ++- src/include/catalog/pg_subscription.h | 6 + src/include/catalog/pg_subscription_rel.h | 14 +- src/include/replication/slot.h | 3 +- src/include/replication/worker_internal.h | 23 +- 11 files changed, 803 insertions(+), 191 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 852cb30ae1..60718ab587 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8030,6 +8030,19 @@ SCRAM-SHA-256$<iteration count>:&l origin. + + + + sublastusedid int8 + + + The last used ID for tablesync workers. It acts as an unique identifier + for replication slots which are created by tablesync workers. + The last used ID needs to be persisted to make logical replication safely + proceed after any interruption. If sublastusedid is 0, then no table has + been synced yet. + + @@ -8114,6 +8127,24 @@ SCRAM-SHA-256$<iteration count>:&l otherwise null + + + + srrelslotname name + + + Replication slot name that is used for synchronization of relation + + + + + + srreloriginname name + + + Origin name that is used for tracking synchronization of relation + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d07f88ce28..152fdaa310 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -108,6 +108,14 @@ GetSubscription(Oid subid, bool missing_ok) Anum_pg_subscription_suborigin); sub->origin = TextDatumGetCString(datum); + /* Get last used id */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_sublastusedid, + &isnull); + Assert(!isnull); + sub->lastusedid = DatumGetInt64(datum); + ReleaseSysCache(tup); return sub; @@ -199,6 +207,44 @@ DisableSubscription(Oid subid) table_close(rel, NoLock); } +/* + * Update the last used replication slot ID for the given subscription. + */ +void +UpdateSubscriptionLastSlotId(Oid subid, int64 lastusedid) +{ + Relation rel; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + HeapTuple tup; + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for subscription %u", subid); + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_subscription_sublastusedid - 1] = true; + values[Anum_pg_subscription_sublastusedid- 1] = Int64GetDatum(lastusedid); + + /* Update the catalog */ + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + heap_freetuple(tup); + + table_close(rel, NoLock); +} + /* * Convert text array to list of strings. * @@ -228,7 +274,7 @@ textarray_to_stringlist(ArrayType *textarray) */ void AddSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) + XLogRecPtr sublsn, char *relslotname, char *reloriginname) { Relation rel; HeapTuple tup; @@ -257,6 +303,16 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); else nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + if (relslotname) + values[Anum_pg_subscription_rel_srrelslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(relslotname)); + else + nulls[Anum_pg_subscription_rel_srrelslotname - 1] = true; + if (reloriginname) + values[Anum_pg_subscription_rel_srreloriginname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(reloriginname)); + else + nulls[Anum_pg_subscription_rel_srreloriginname - 1] = true; tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -269,6 +325,60 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, table_close(rel, NoLock); } +/* + * Internal function to modify columns for relation state update + */ +static void +UpdateSubscriptionRelState_internal(Datum *values, + bool *nulls, + bool *replaces, + char state, + XLogRecPtr sublsn) +{ + replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + + replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; +} + +/* + * Internal function to modify columns for replication slot update + */ +static void +UpdateSubscriptionRelReplicationSlot_internal(Datum *values, + bool *nulls, + bool *replaces, + char *relslotname) +{ + replaces[Anum_pg_subscription_rel_srrelslotname - 1] = true; + if (relslotname) + values[Anum_pg_subscription_rel_srrelslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(relslotname)); + else + nulls[Anum_pg_subscription_rel_srrelslotname - 1] = true; +} + +/* + * Internal function to modify columns for replication origin update + */ +static void +UpdateSubscriptionRelOrigin_internal(Datum *values, + bool *nulls, + bool *replaces, + char *reloriginname) +{ + replaces[Anum_pg_subscription_rel_srreloriginname - 1] = true; + if (reloriginname) + values[Anum_pg_subscription_rel_srreloriginname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(reloriginname)); + else + nulls[Anum_pg_subscription_rel_srreloriginname - 1] = true; +} + /* * Update the state of a subscription table. */ @@ -299,14 +409,56 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); - replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; - values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + UpdateSubscriptionRelState_internal(values, nulls, replaces, state, sublsn); - replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; - if (sublsn != InvalidXLogRecPtr) - values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - else - nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + /* Cleanup. */ + table_close(rel, NoLock); +} + +/* + * Update replication slot name, origin name and state of + * a subscription table in one transaction. + */ +void +UpdateSubscriptionRel(Oid subid, + Oid relid, + char state, + XLogRecPtr sublsn, + char *relslotname, + char *reloriginname) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription_rel]; + Datum values[Natts_pg_subscription_rel]; + bool replaces[Natts_pg_subscription_rel]; + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + + /* Try finding existing mapping. */ + tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(subid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u does not exist", + relid, subid); + + /* Update the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + UpdateSubscriptionRelState_internal(values, nulls, replaces, state, sublsn); + UpdateSubscriptionRelReplicationSlot_internal(values, nulls, replaces, relslotname); + UpdateSubscriptionRelOrigin_internal(values, nulls, replaces, reloriginname); tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); @@ -318,6 +470,85 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, table_close(rel, NoLock); } +/* + * Get origin name of subscription table. + * + * reloriginname's value has the replication origin name if the origin exists. + */ +void +GetSubscriptionRelOrigin(Oid subid, Oid relid, char *reloriginname, bool *isnull) +{ + HeapTuple tup; + Relation rel; + Datum d; + char *originname; + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + /* Try finding the mapping. */ + tup = SearchSysCache2(SUBSCRIPTIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + { + table_close(rel, AccessShareLock); + } + + d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, + Anum_pg_subscription_rel_srreloriginname, isnull); + if (!*isnull) + { + originname = DatumGetCString(DirectFunctionCall1(nameout, d)); + memcpy(reloriginname, originname, NAMEDATALEN); + } + + /* Cleanup */ + ReleaseSysCache(tup); + + table_close(rel, AccessShareLock); +} + +/* + * Get replication slot name of subscription table. + * + * slotname's value has the replication slot name if the subscription has any. + */ +void +GetSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *slotname) +{ + HeapTuple tup; + Relation rel; + Datum d; + char *relrepslot; + bool isnull; + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + /* Try finding the mapping. */ + tup = SearchSysCache2(SUBSCRIPTIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + { + table_close(rel, AccessShareLock); + } + + d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, + Anum_pg_subscription_rel_srrelslotname, &isnull); + if (!isnull) + { + relrepslot = DatumGetCString(DirectFunctionCall1(nameout, d)); + memcpy(slotname, relrepslot, NAMEDATALEN); + } + + /* Cleanup */ + ReleaseSysCache(tup); + + table_close(rel, AccessShareLock); +} + /* * Get state of subscription table. * diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d4e798baeb..4f5b3c572c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -710,6 +710,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_sublastusedid - 1] = Int64GetDatum(0); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -773,7 +774,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, rv->schemaname, rv->relname); AddSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr); + InvalidXLogRecPtr, NULL, NULL); } /* @@ -864,6 +865,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, SubRemoveRels *sub_remove_rels; WalReceiverConn *wrconn; bool must_use_password; + List *sub_remove_slots = NIL; + LogicalRepWorker *worker; /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -943,7 +946,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); + InvalidXLogRecPtr, NULL, NULL); ereport(DEBUG1, (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", rv->schemaname, rv->relname, sub->name))); @@ -967,6 +970,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { char state; XLogRecPtr statelsn; + char slotname[NAMEDATALEN] = {0}; /* * Lock pg_subscription_rel with AccessExclusiveLock to @@ -993,13 +997,36 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + /* + * Find the logical replication sync worker. If exists, store + * the slot number for dropping associated replication slots + * later. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(sub->oid, relid, false); + if (worker) + { + logicalrep_worker_stop(sub->oid, relid); + sub_remove_slots = lappend(sub_remove_slots, &worker->slot_name); + } + else + { + /* + * Sync of this relation might be failed in an earlier + * attempt, but the replication slot might still exist. + */ + GetSubscriptionRelReplicationSlot(sub->oid, relid, slotname); + if (strlen(slotname) > 0) + sub_remove_slots = lappend(sub_remove_slots, slotname); + } + LWLockRelease(LogicalRepWorkerLock); /* * For READY state, we would have already dropped the * tablesync origin. */ - if (state != SUBREL_STATE_READY) + if (state != SUBREL_STATE_READY && + state != SUBREL_STATE_SYNCDONE) { char originname[NAMEDATALEN]; @@ -1027,31 +1054,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } /* - * Drop the tablesync slots associated with removed tables. This has - * to be at the end because otherwise if there is an error while doing - * the database operations we won't be able to rollback dropped slots. + * Drop the replication slots associated with tablesync workers for + * removed tables. This has to be at the end because otherwise if + * there is an error while doing the database operations we won't be + * able to rollback dropped slots. */ - for (off = 0; off < remove_rel_len; off++) + foreach(lc, sub_remove_slots) { - if (sub_remove_rels[off].state != SUBREL_STATE_READY && - sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) - { - char syncslotname[NAMEDATALEN] = {0}; + char syncslotname[NAMEDATALEN] = {0}; - /* - * For READY/SYNCDONE states we know the tablesync slot has - * already been dropped by the tablesync worker. - * - * For other states, there is no certainty, maybe the slot - * does not exist yet. Also, if we fail after removing some of - * the slots, next time, it will again try to drop already - * dropped slots and fail. For these reasons, we allow - * missing_ok = true for the drop. - */ - ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, - syncslotname, sizeof(syncslotname)); - ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); - } + memcpy(syncslotname, lfirst(lc), sizeof(NAMEDATALEN)); + + /* + * There is no certainty, maybe the slot does not exist yet. Also, + * if we fail after removing some of the slots, next time, it will + * again try to drop already dropped slots and fail. For these + * reasons, we allow missing_ok = true for the drop. + */ + ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); } } PG_FINALLY(); @@ -1474,6 +1494,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) char *subname; char *conninfo; char *slotname; + int64 lastusedid; List *subworkers; ListCell *lc; char originname[NAMEDATALEN]; @@ -1546,6 +1567,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) else slotname = NULL; + /* Get the last used identifier by the subscription */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, + Anum_pg_subscription_sublastusedid, &isnull); + if (!isnull) + lastusedid = DatumGetInt64(datum); + else + lastusedid = 0; + /* * Since dropping a replication slot is not transactional, the replication * slot stays dropped even if the transaction rolls back. So we cannot @@ -1595,6 +1624,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } list_free(subworkers); + rstates = GetSubscriptionRelations(subid, true); + /* * Remove the no-longer-useful entry in the launcher's table of apply * worker start times. @@ -1606,36 +1637,26 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ApplyLauncherForgetWorkerStartTime(subid); /* - * Cleanup of tablesync replication origins. - * - * Any READY-state relations would already have dealt with clean-ups. + * Cleanup of tablesync replication origins associated with the + * subscription, if exists. Try to drop origins by creating all origin + * names created for this subscription. * * Note that the state can't change because we have already stopped both * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. + * + * XXX: This can be handled better instead of looping through all possible */ - rstates = GetSubscriptionRelations(subid, true); - foreach(lc, rstates) + for (int64 i = 1; i <= lastusedid; i++) { - SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); - Oid relid = rstate->relid; - - /* Only cleanup resources of tablesync workers */ - if (!OidIsValid(relid)) - continue; + char originname_to_drop[NAMEDATALEN] = {0}; - /* - * Drop the tablesync's origin tracking if exists. - * - * It is possible that the origin is not yet created for tablesync - * worker so passing missing_ok = true. This can happen for the states - * before SUBREL_STATE_FINISHEDCOPY. - */ - ReplicationOriginNameForLogicalRep(subid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); + snprintf(originname_to_drop, sizeof(originname_to_drop), "pg_%u_%lld", subid, (long long) i); + /* missing_ok = true, since the origin might be already dropped. */ + replorigin_drop_by_name(originname_to_drop, true, false); } + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); @@ -1694,39 +1715,17 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) PG_TRY(); { - foreach(lc, rstates) - { - SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); - Oid relid = rstate->relid; + List *slots = NULL; - /* Only cleanup resources of tablesync workers */ - if (!OidIsValid(relid)) - continue; - /* - * Drop the tablesync slots associated with removed tables. - * - * For SYNCDONE/READY states, the tablesync slot is known to have - * already been dropped by the tablesync worker. - * - * For other states, there is no certainty, maybe the slot does - * not exist yet. Also, if we fail after removing some of the - * slots, next time, it will again try to drop already dropped - * slots and fail. For these reasons, we allow missing_ok = true - * for the drop. - */ - if (rstate->state != SUBREL_STATE_SYNCDONE) - { - char syncslotname[NAMEDATALEN] = {0}; + slots = GetReplicationSlotNamesBySubId(wrconn, subid, true); + foreach(lc, slots) + { + char *syncslotname = (char *) lfirst(lc); - ReplicationSlotNameForTablesync(subid, relid, syncslotname, - sizeof(syncslotname)); - ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); - } + ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); } - list_free(rstates); - /* * If there is a slot associated with the subscription, then drop the * replication slot at the publisher. @@ -1743,6 +1742,71 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) table_close(rel, NoLock); } +/* + * GetReplicationSlotNamesBySubId + * + * Get the replication slot names associated with the subscription. + */ +List * +GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok) +{ + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[1] = {NAMEOID}; + List *tablelist = NIL; + + Assert(wrconn); + + load_file("libpqwalreceiver", false); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT slot_name" + " FROM pg_replication_slots" + " WHERE slot_name LIKE 'pg_%i_sync_%%';", + subid); + PG_TRY(); + { + WalRcvExecResult *res; + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + + if (res->status != WALRCV_OK_TUPLES) + { + ereport(ERROR, + errmsg("could not receive list of slots associated with the subscription %u, error: %s", + subid, res->err)); + } + + /* Process tables. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *repslotname; + char *slotattr; + bool isnull; + + slotattr = NameStr(*DatumGetName(slot_getattr(slot, 1, &isnull))); + Assert(!isnull); + + repslotname = palloc(sizeof(char) * strlen(slotattr) + 1); + memcpy(repslotname, slotattr, sizeof(char) * strlen(slotattr)); + repslotname[strlen(slotattr)] = '\0'; + tablelist = lappend(tablelist, repslotname); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + } + PG_FINALLY(); + { + pfree(cmd.data); + } + PG_END_TRY(); + return tablelist; +} + /* * Drop the replication slot at the publisher node using the replication * connection. @@ -2155,6 +2219,7 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) { ListCell *lc; + LogicalRepWorker *worker; foreach(lc, rstates) { @@ -2165,18 +2230,20 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) if (!OidIsValid(relid)) continue; + /* Check if there is a sync worker for the relation */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(subid, relid, false); + LWLockRelease(LogicalRepWorkerLock); + /* * Caller needs to ensure that relstate doesn't change underneath us. * See DropSubscription where we get the relstates. */ - if (rstate->state != SUBREL_STATE_SYNCDONE) + if (worker && + rstate->state != SUBREL_STATE_SYNCDONE) { - char syncslotname[NAMEDATALEN] = {0}; - - ReplicationSlotNameForTablesync(subid, relid, syncslotname, - sizeof(syncslotname)); elog(WARNING, "could not drop tablesync replication slot \"%s\"", - syncslotname); + worker->slot_name); } } diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 1d4e83c4c1..d2c70dffbc 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -440,7 +440,8 @@ pa_launch_parallel_worker(void) MySubscription->name, MyLogicalRepWorker->userid, InvalidOid, - dsm_segment_handle(winfo->dsm_seg)); + dsm_segment_handle(winfo->dsm_seg), + InvalidRepSlotId); if (launched) { diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 945619b603..9f378c311e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -304,7 +304,7 @@ logicalrep_workers_find(Oid subid, bool only_running) */ bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid, dsm_handle subworker_dsm) + Oid relid, dsm_handle subworker_dsm, int64 slotid) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -430,6 +430,9 @@ retry: worker->launch_time = now; worker->in_use = true; worker->generation++; + worker->created_slot = false; + worker->rep_slot_id = slotid; + worker->slot_name = (char *) palloc(NAMEDATALEN); worker->proc = NULL; worker->dbid = dbid; worker->userid = userid; @@ -1184,7 +1187,8 @@ ApplyLauncherMain(Datum main_arg) ApplyLauncherSetWorkerStartTime(sub->oid, now); logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, - DSM_HANDLE_INVALID); + DSM_HANDLE_INVALID, + InvalidRepSlotId); } else { diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d455d97f2f..f6fd3b0a30 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -318,40 +318,29 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); + CommitTransactionCommand(); /* - * End streaming so that LogRepWorkerWalRcvConn can be used to drop - * the slot. - */ - walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); - - /* - * Cleanup the tablesync slot. + * Cleanup the tablesync slot. If the slot name used by this worker is + * different from the default slot name for the worker, this means the + * current table had started to being synchronized by another worker + * and replication slot. And this worker is reusing a replication slot + * from a previous attempt. We do not need that replication slot + * anymore. * * This has to be done after updating the state because otherwise if * there is an error while doing the database operations we won't be * able to rollback dropped slot. */ ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->rep_slot_id, syncslotname, sizeof(syncslotname)); /* - * It is important to give an error if we are unable to drop the slot, - * otherwise, it won't be dropped till the corresponding subscription - * is dropped. So passing missing_ok = false. - */ - ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); - - CommitTransactionCommand(); - pgstat_report_stat(false); - - /* - * Start a new transaction to clean up the tablesync origin tracking. - * This transaction will be ended within the finish_sync_worker(). - * Now, even, if we fail to remove this here, the apply worker will - * ensure to clean it up afterward. + * We are safe to drop the replication tracking origin after this + * point. Now, even, if we fail to remove this here, the apply worker + * will ensure to clean it up afterward. * * We need to do this after the table state is set to SYNCDONE. * Otherwise, if an error occurs while performing the database @@ -360,32 +349,73 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * have been cleared before restart. So, the restarted worker will use * invalid replication progress state resulting in replay of * transactions that have already been applied. + * + * Firstly reset the origin session to remove the ownership of the + * slot. This is needed to allow the origin to be dropped or reused + * later. */ + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + StartTransactionCommand(); + if (MyLogicalRepWorker->slot_name && strcmp(syncslotname, MyLogicalRepWorker->slot_name) != 0) + { + /* + * End streaming so that LogRepWorkerWalRcvConn can be used to + * drop the slot. + */ + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, MyLogicalRepWorker->slot_name, false); - ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + /* + * Drop replication origin + * + * There is a chance that the user is concurrently performing refresh + * for the subscription where we remove the table state and its origin + * or the apply worker would have removed this origin. So passing + * missing_ok = true. + */ + replorigin_drop_by_name(originname, true, false); + } /* - * Resetting the origin session removes the ownership of the slot. - * This is needed to allow the origin to be dropped. + * We are safe to remove persisted replication slot and origin data, + * since it's already in SYNCDONE state. They will not be needed + * anymore. */ - replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + UpdateSubscriptionRel(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn, + NULL, + NULL); + ereport(DEBUG2, + (errmsg("process_syncing_tables_for_sync: updated originname: %s, slotname: %s, state: %c for relation \"%u\" in subscription \"%u\".", + "NULL", + "NULL", + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->subid))); + CommitTransactionCommand(); + pgstat_report_stat(false); /* - * Drop the tablesync's origin tracking if exists. - * - * There is a chance that the user is concurrently performing refresh - * for the subscription where we remove the table state and its origin - * or the apply worker would have removed this origin. So passing - * missing_ok = true. + * This should return the default origin name for the worker. Even if + * the worker used a different origin for this table, it should be + * dropped and removed from the catalog so far. */ - replorigin_drop_by_name(originname, true, false); + StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); /* Sync worker has completed synchronization of the current table. */ MyLogicalRepWorker->is_sync_completed = true; @@ -482,6 +512,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (current_lsn >= rstate->lsn) { char originname[NAMEDATALEN]; + bool is_origin_null = true; rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; @@ -502,18 +533,31 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. */ - ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); + GetSubscriptionRelOrigin(MyLogicalRepWorker->subid, + rstate->relid, originname, + &is_origin_null); + + if (!is_origin_null) + { + replorigin_drop_by_name(originname, true, false); + } /* * Update the state to READY only after the origin cleanup. */ - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn); + UpdateSubscriptionRel(MyLogicalRepWorker->subid, + rstate->relid, + rstate->state, + rstate->lsn, + NULL, + NULL); + ereport(DEBUG2, + (errmsg("process_syncing_tables_for_apply: updated originname: %s, slotname: %s, state: %c for relation \"%u\" in subscription \"%u\".", + "NULL", "NULL", rstate->state, + rstate->relid, MyLogicalRepWorker->subid))); + + CommitTransactionCommand(); + started_tx = false; } } else @@ -602,12 +646,25 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) TimestampDifferenceExceeds(hentry->last_start_time, now, wal_retrieve_retry_interval)) { + if (IsTransactionState()) + CommitTransactionCommand(); + StartTransactionCommand(); + started_tx = true; + + MySubscription->lastusedid++; + UpdateSubscriptionLastSlotId(MyLogicalRepWorker->subid, + MySubscription->lastusedid); + ereport(DEBUG2, + (errmsg("process_syncing_tables_for_apply: incremented lastusedid to %lld for subscription %u", + (long long) MySubscription->lastusedid, MySubscription->oid))); + logicalrep_worker_launch(MyLogicalRepWorker->dbid, MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, rstate->relid, - DSM_HANDLE_INVALID); + DSM_HANDLE_INVALID, + MySubscription->lastusedid); hentry->last_start_time = now; } } @@ -1230,8 +1287,8 @@ copy_table(Relation rel) * The name must not exceed NAMEDATALEN - 1 because of remote node constraints * on slot name length. We append system_identifier to avoid slot_name * collision with subscriptions in other clusters. With the current scheme - * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum - * length of slot_name will be 50. + * pg_%u_sync_%lu_UINT64_FORMAT (3 + 10 + 6 + 20 + 20 + '\0'), the maximum + * length of slot_name will be 45. * * The returned slot name is stored in the supplied buffer (syncslotname) with * the given size. @@ -1242,11 +1299,11 @@ copy_table(Relation rel) * had changed. */ void -ReplicationSlotNameForTablesync(Oid suboid, Oid relid, +ReplicationSlotNameForTablesync(Oid suboid, int64 slotid, char *syncslotname, Size szslot) { - snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, - relid, GetSystemIdentifier()); + snprintf(syncslotname, szslot, "pg_%u_sync_%lld_" UINT64_FORMAT, suboid, + (long long) slotid, GetSystemIdentifier()); } /* @@ -1290,6 +1347,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UserContext ucxt; bool must_use_password; bool run_as_owner; + char *prev_slotname; /* Check the state of the table synchronization. */ StartTransactionCommand(); @@ -1324,7 +1382,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Calculate the name of the tablesync slot. */ slotname = (char *) palloc(NAMEDATALEN); ReplicationSlotNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->rep_slot_id, slotname, NAMEDATALEN); @@ -1356,12 +1414,26 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); + /* + * See if tablesync of the current relation has been started with another + * replication slot. + * + * Read previous slot name from the catalog, if exists. + */ + prev_slotname = (char *) palloc(NAMEDATALEN); + StartTransactionCommand(); + GetSubscriptionRelReplicationSlot(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + prev_slotname); + /* Assign the origin tracking record name. */ ReplicationOriginNameForLogicalRep(MySubscription->oid, MyLogicalRepWorker->relid, originname, sizeof(originname)); + CommitTransactionCommand(); + if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC) { /* @@ -1375,10 +1447,53 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * breakdown then it wouldn't have succeeded so trying it next time * seems like a better bet. */ - ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true); + if (strlen(prev_slotname) > 0) + { + ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, prev_slotname, true); + + StartTransactionCommand(); + /* Replication origin might still exist. Try to drop */ + replorigin_drop_by_name(originname, true, false); + + /* + * Remove replication slot and origin name from the relation's + * catalog record + */ + UpdateSubscriptionRel(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn, + NULL, + NULL); + CommitTransactionCommand(); + ereport(DEBUG2, + (errmsg("LogicalRepSyncTableStart: updated originname: %s, slotname: %s, state: %c for relation \"%u\" in subscription \"%u\".", + "NULL", "NULL", MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relid, MyLogicalRepWorker->subid))); + } } else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY) { + /* + * At this point, the table that is currently being synchronized + * should have its replication slot name filled in the catalog. The + * tablesync process was started with another sync worker and + * replication slot. We need to continue using the same replication + * slot in this worker too. + */ + if (strlen(prev_slotname) == 0) + { + elog(ERROR, "Replication slot could not be found for subscription %u, relation %u", + MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid); + } + + /* + * Proceed with the correct replication slot. Use previously created + * replication slot to sync this table. + */ + memcpy(slotname, prev_slotname, NAMEDATALEN); + /* * The COPY phase was previously done, but tablesync then crashed * before it was able to finish normally. @@ -1398,7 +1513,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) goto copy_table_done; } + pfree(prev_slotname); + /* Preparing for table copy operation */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; @@ -1406,11 +1523,31 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Update the state and make it visible to others. */ StartTransactionCommand(); - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + + /* + * Refresh the originname in case of having non-existing origin + * from previous failed sync attempts. + * If that's the case, it should be removed from the catalog so far. + * Then, we can continue by reusing the origin created by the current + * worker instead of . + */ + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + UpdateSubscriptionRel(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn, + slotname, + originname); CommitTransactionCommand(); + ereport(DEBUG2, + (errmsg("LogicalRepSyncTableStart: updated originname: %s, slotname: %s, state: %c for relation \"%u\" in subscription \"%u\".", + slotname, originname, MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relid, MyLogicalRepWorker->subid))); + pgstat_report_stat(true); StartTransactionCommand(); @@ -1438,48 +1575,96 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) res->err))); walrcv_clear_result(res); + originid = replorigin_by_name(originname, true); + /* * Create a new permanent logical decoding slot. This slot will be used * for the catchup phase after COPY is done, so tell it to use the * snapshot to make the final data consistent. + * + * Replication slot will only be created if either this is the first run + * of the worker or we're not using a previous replication slot. */ - walrcv_create_slot(LogRepWorkerWalRcvConn, - slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); - - /* - * Setup replication origin tracking. The purpose of doing this before the - * copy is to avoid doing the copy again due to any error in setting up - * origin tracking. - */ - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) + if (!MyLogicalRepWorker->created_slot) { + walrcv_create_slot(LogRepWorkerWalRcvConn, + slotname, false /* permanent */ , false /* two_phase */ , + CRS_USE_SNAPSHOT, origin_startpos); + ereport(DEBUG2, + (errmsg("LogicalRepSyncTableStart: created replication slot %s for subscription %u", + slotname, MyLogicalRepWorker->subid))); + /* - * Origin tracking does not exist, so create it now. - * - * Then advance to the LSN got from walrcv_create_slot. This is WAL - * logged for the purpose of recovery. Locks are to prevent the - * replication origin from vanishing while advancing. + * Remember that we created the slot so that we will not try to create + * it again. */ - originid = replorigin_create(originname); - - LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, - true /* go backward */ , true /* WAL log */ ); - UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->created_slot = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); - replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; + /* + * Setup replication origin tracking. The purpose of doing this before + * the copy is to avoid doing the copy again due to any error in + * setting up origin tracking. + */ + if (!OidIsValid(originid)) + { + /* + * Origin tracking does not exist, so create it now. + */ + originid = replorigin_create(originname); + } + else + { + /* + * At this point, there shouldn't be any existing replication + * origin with the same name. + */ + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("replication origin \"%s\" already exists", + originname))); + } } else { - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("replication origin \"%s\" already exists", - originname))); + /* + * Do not create a new replication slot, reuse the existing one + * instead. Use a new snapshot for the replication slot to ensure that + * tablesync and apply proceses are consistent with each other. + */ + WalRcvStreamOptions options; + int server_version; + + server_version = walrcv_server_version(LogRepWorkerWalRcvConn); + options.proto.logical.proto_version = + server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : + server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : + LOGICALREP_PROTO_VERSION_NUM; + options.proto.logical.publication_names = MySubscription->publications; + + walrcv_slot_snapshot(LogRepWorkerWalRcvConn, slotname, &options, origin_startpos); + ereport(DEBUG2, + (errmsg("LogicalRepSyncTableStart: reusing replication slot %s for relation %u in subscription %u", + slotname, MyLogicalRepWorker->relid, + MyLogicalRepWorker->subid))); } + /* + * Advance to the LSN got from walrcv_create_slot or walrcv_slot_snapshot. + * This is WAL logged for the purpose of recovery. Locks are to prevent + * the replication origin from vanishing while advancing. + * + * Then setup replication origin tracking. + */ + LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, + true /* go backward */ , true /* WAL log */ ); + UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + + replorigin_session_setup(originid, 0); + replorigin_session_origin = originid; + /* * Make sure that the copy command runs as the table owner, unless the * user has opted out of that behaviour. @@ -1538,12 +1723,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * Update the persisted state to indicate the COPY phase is done; make it * visible to others. */ - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - SUBREL_STATE_FINISHEDCOPY, - MyLogicalRepWorker->relstate_lsn); + UpdateSubscriptionRel(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_FINISHEDCOPY, + MyLogicalRepWorker->relstate_lsn, + slotname, + originname); CommitTransactionCommand(); + ereport(DEBUG2, + (errmsg("LogicalRepSyncTableStart: updated originname: %s, slotname: %s, state: %c for relation \"%u\" in subscription \"%u\".", + originname, slotname, SUBREL_STATE_FINISHEDCOPY, + MyLogicalRepWorker->relid, MyLogicalRepWorker->subid))); copy_table_done: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4eb67ebd26..c3a6aa3894 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -466,8 +466,16 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, { if (OidIsValid(relid)) { - /* Replication origin name for tablesync workers. */ - snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid); + bool is_null = true; + + /* + * Replication origin name for tablesync workers. First, look into the + * catalog. If originname does not exist, then use the default name. + */ + GetSubscriptionRelOrigin(suboid, relid, + originname, &is_null); + if (is_null) + snprintf(originname, szoriginname, "pg_%u_%lld", suboid, (long long) MyLogicalRepWorker->rep_slot_id); } else { @@ -4503,6 +4511,9 @@ start_table_sync(XLogRecPtr *origin_startpos, /* allocate slot name in long-lived context */ *myslotname = MemoryContextStrdup(ApplyContext, syncslotname); + + /* Keep the replication slot name used for this sync. */ + MyLogicalRepWorker->slot_name = *myslotname; pfree(syncslotname); } @@ -4555,13 +4566,25 @@ run_tablesync_worker(WalRcvStreamOptions *options, { MyLogicalRepWorker->is_sync_completed = false; + /* + * If it's already connected to the publisher, end streaming before using + * the same connection for another iteration + */ + if (LogRepWorkerWalRcvConn != NULL) + { + TimeLineID tli; + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + } + /* Start table synchronization. */ start_table_sync(origin_startpos, &slotname); + StartTransactionCommand(); ReplicationOriginNameForLogicalRep(MySubscription->oid, MyLogicalRepWorker->relid, originname, originname_size); + CommitTransactionCommand(); set_apply_error_context_origin(originname); @@ -4601,11 +4624,10 @@ run_apply_worker(WalRcvStreamOptions *options, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("subscription has no replication slot set"))); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, - originname, originname_size); - /* Setup replication origin tracking. */ StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, originname_size); originid = replorigin_by_name(originname, true); if (!OidIsValid(originid)) originid = replorigin_create(originname); @@ -4921,7 +4943,32 @@ TablesyncWorkerMain(Datum main_arg) } if (!is_table_found) + { + TimeLineID tli; + + /* + * It is important to give an error if we are unable to drop the + * slot, otherwise, it won't be dropped till the corresponding + * subscription is dropped. So passing missing_ok = false. + */ + if (MyLogicalRepWorker->created_slot) + { + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, MyLogicalRepWorker->slot_name, false); + } + + /* + * Drop replication origin before exiting. + * + * There is a chance that the user is concurrently performing refresh + * for the subscription where we remove the table state and its origin + * or the apply worker would have removed this origin. So passing + * missing_ok = true. + */ + replorigin_drop_by_name(originname, true, false); + break; + } } } diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 1d40eebc78..7e13f59847 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -108,6 +108,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW /* Only publish data originating from the specified origin */ text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY); + + /* The last used ID to create a replication slot for tablesync */ + int64 sublastusedid BKI_DEFAULT(0); #endif } FormData_pg_subscription; @@ -144,6 +147,8 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + int64 lastusedid; /* Last used unique ID to create replication + * slots in tablesync */ } Subscription; /* Disallow streaming in-progress transactions. */ @@ -164,6 +169,7 @@ typedef struct Subscription extern Subscription *GetSubscription(Oid subid, bool missing_ok); extern void FreeSubscription(Subscription *sub); extern void DisableSubscription(Oid subid); +extern void UpdateSubscriptionLastSlotId(Oid subid, int64 lastusedid); extern int CountDBSubscriptions(Oid dbid); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 60a2bcca23..185164d75e 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -44,6 +44,12 @@ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId) * used for synchronization * coordination, or NULL if not * valid */ + NameData srrelslotname BKI_FORCE_NULL; /* name of the replication + * slot for relation in + * subscription */ + NameData srreloriginname BKI_FORCE_NULL; /* origin name for relation in + * subscription */ + #endif } FormData_pg_subscription_rel; @@ -81,10 +87,16 @@ typedef struct SubscriptionRelState } SubscriptionRelState; extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn); + XLogRecPtr sublsn, char *relslotname, char *reloriginname); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); +extern void UpdateSubscriptionRel(Oid subid, Oid relid, char state, + XLogRecPtr sublsn, char *relslotname, char *reloriginname); + extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); +extern void GetSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *slotname); +extern void GetSubscriptionRelOrigin(Oid subid, Oid relid, char *reloriginname, bool *isnull); + extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionRelations(Oid subid); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8a89dc784..31b2c41893 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -237,8 +237,9 @@ extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); -extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); +extern void ReplicationSlotNameForTablesync(Oid suboid, int64 slotid, char *syncslotname, Size szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); +extern List *GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 32783a8cdd..4b2148d13f 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -35,6 +35,23 @@ typedef struct LogicalRepWorker /* Indicates if this slot is used or free. */ bool in_use; + /* + * Indicates if the sync worker created a replication slot for itself + * in any point of its lifetime. + * False means that the worker has not created a slot yet, and has been + * reusing replication slots created by other workers so far. + */ + bool created_slot; + + /* + * Unique identifier for replication slot to be created by tablesnync + * workers, if needed. + */ + int64 rep_slot_id; + + /* Replication slot name used by the worker. */ + char *slot_name; + /* Increased every time the slot is taken by new worker. */ uint16 generation; @@ -242,7 +259,8 @@ extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, extern List *logicalrep_workers_find(Oid subid, bool only_running); extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, - dsm_handle subworker_dsm); + dsm_handle subworker_dsm, + int64 slotid); extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); @@ -338,4 +356,7 @@ am_parallel_apply_worker(void) return isParallelApplyWorker(MyLogicalRepWorker); } +/* Invalid identifier to be used for naming replication slots */ +#define InvalidRepSlotId 0 + #endif /* WORKER_INTERNAL_H */ -- 2.27.0