From b93e330a1ffe5c43d09f72e38726363bcb49d890 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Wed, 30 Dec 2020 15:02:21 +1100 Subject: [PATCH v9] WIP patch for the Solution1. ==== Features: * tablesync slot is now permanent instead of temporary. The tablesync slot name is no longer tied to the Subscription slot name. * the tablesync slot cleanup (drop) code is added for DropSubscription and for finish_sync_worker functions * tablesync worked now allowing multiple tx instead of single tx * a new state (SUBREL_STATE_COPYDONE) is persisted after a successful copy_table in LogicalRepSyncTableStart. * if a relaunched tablesync finds the state is SUBREL_STATE_COPYDONE then it will bypass the initial copy_table phase. * tablesync sets up replication origin tracking in LogicalRepSyncTableStart (similar as done for the apply worker). The origin is advanced when first created. * tablesync replication origin tracking is cleaned up during DropSubscription and/or process_syncing_tables_for_apply. * The DropSubscription cleanup code was enhanced in v7 to take care of crashed sync workers. * Minor updates to PG docs TODO / Known Issues: * Source includes temporary "!!>>" excessive logging which I added to help testing during development * Address review comments --- doc/src/sgml/catalogs.sgml | 1 + src/backend/commands/subscriptioncmds.c | 222 +++++++++++++++++++------- src/backend/replication/logical/origin.c | 4 +- src/backend/replication/logical/tablesync.c | 232 ++++++++++++++++++++++++---- src/backend/replication/logical/worker.c | 18 +-- src/include/catalog/pg_subscription_rel.h | 1 + src/include/replication/slot.h | 3 + 7 files changed, 376 insertions(+), 105 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index d988636..266615c 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7651,6 +7651,7 @@ SCRAM-SHA-256$<iteration count>:&l State code: i = initialize, d = data is being copied, + C = table data has been copied, s = synchronized, r = ready (normal replication) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1696454..c366614 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -37,6 +37,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/worker_internal.h" +#include "replication/slot.h" #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -928,7 +929,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) char *err = NULL; RepOriginId originid; WalReceiverConn *wrconn = NULL; - StringInfoData cmd; Form_pg_subscription form; /* @@ -1016,76 +1016,188 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ReleaseSysCache(tup); /* - * Stop all the subscription workers immediately. - * - * This is necessary if we are dropping the replication slot, so that the - * slot becomes accessible. + * Try to acquire the connection necessary for dropping slots. + * We do this here so that the same connection may be shared + * for dropping the Subscription slot, as well as dropping any + * tablesync slots. * - * It is also necessary if the subscription is disabled and was disabled - * in the same transaction. Then the workers haven't seen the disabling - * yet and will still be running, leading to hangs later when we want to - * drop the replication origin. If the subscription was disabled before - * this transaction, then there shouldn't be any workers left, so this - * won't make a difference. - * - * New workers won't be started because we hold an exclusive lock on the - * subscription till the end of the transaction. + * Note: If the slotname is NONE/NULL then connection errors are + * suppressed. This is necessary so that the DROP SUBSCRIPTION + * can still complete even when the connection to publisher is + * broken. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - subworkers = logicalrep_workers_find(subid, false); - LWLockRelease(LogicalRepWorkerLock); - foreach(lc, subworkers) + load_file("libpqwalreceiver", false); + + wrconn = walrcv_connect(conninfo, true, subname, &err); + if (wrconn == NULL && slotname != NULL) + ereport(ERROR, + (errmsg("could not connect to publisher when attempting to " + "drop the replication slot \"%s\"", slotname), + errdetail("The error was: %s", err), + /* translator: %s is an SQL ALTER command */ + errhint("Use %s to disassociate the subscription from the slot.", + "ALTER SUBSCRIPTION ... SET (slot_name = NONE)"))); + + PG_TRY(); { - LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + /* + * Stop all the subscription workers immediately. + * + * This is necessary if we are dropping the replication slot, so that the + * slot becomes accessible. + * + * It is also necessary if the subscription is disabled and was disabled + * in the same transaction. Then the workers haven't seen the disabling + * yet and will still be running, leading to hangs later when we want to + * drop the replication origin. If the subscription was disabled before + * this transaction, then there shouldn't be any workers left, so this + * won't make a difference. + * + * New workers won't be started because we hold an exclusive lock on the + * subscription till the end of the transaction. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + subworkers = logicalrep_workers_find(subid, false); + LWLockRelease(LogicalRepWorkerLock); + foreach(lc, subworkers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - logicalrep_worker_stop(w->subid, w->relid); - } - list_free(subworkers); + logicalrep_worker_stop(w->subid, w->relid); + } + list_free(subworkers); - /* Clean up dependencies */ - deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); + /* + * Tablesync resource cleanup (slots and origins). + * + * Any READY-state relations would already have dealt with clean-ups. + */ + { + List *rstates; + ListCell *lc; - /* Remove any associated relation synchronization states. */ - RemoveSubscriptionRel(subid, InvalidOid); + rstates = GetSubscriptionNotReadyRelations(subid); + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + Oid relid = rstate->relid; - /* Remove the origin tracking if exists. */ - snprintf(originname, sizeof(originname), "pg_%u", subid); - originid = replorigin_by_name(originname, true); - if (originid != InvalidRepOriginId) - replorigin_drop(originid, false); + /* Only cleanup the tablesync worker resources */ + if (!OidIsValid(relid)) + continue; - /* - * If there is no slot associated with the subscription, we can finish - * here. - */ - if (!slotname) + /* Drop the tablesync slot. */ + { + char *syncslotname = ReplicationSlotNameForTablesync(subid, relid); + + /* + * If the subscription slotname is NONE/NULL and the connection to publisher is + * broken, but the DropSubscription should still be allowed to complete. + * But without a connection it is not possible to drop any tablesync slots. + */ + if (!wrconn) + { + /* FIXME - OK to just log a warning? */ + elog(WARNING, "!!>> DropSubscription: no connection. Cannot drop tablesync slot \"%s\".", + syncslotname); + } + else + { + PG_TRY(); + { + elog(LOG, "!!>> DropSubscription: dropping the tablesync slot \"%s\".", syncslotname); + ReplicationSlotDropAtPubNode(wrconn, syncslotname); + elog(LOG, "!!>> DropSubscription: dropped the tablesync slot \"%s\".", syncslotname); + } + PG_CATCH(); + { + /* + * Typically tablesync will delete its own slot after it reaches + * SYNCDONE state. Then the apply worker moves the tablesync from + * SYNCDONE to READY state. + * + * Rarely, the DropSubscription may be issued in between when a + * tablesync still is in SYNCDONE, but not yet reached READY state. + * If this happens then the drop slot could fail since it was + * already dropped, so suppress the error. + */ + if (rstate->state != SUBREL_STATE_SYNCDONE) + { + pfree(syncslotname); + PG_RE_THROW(); + } + } + PG_END_TRY(); + } + pfree(syncslotname); + } + + /* Remove the tablesync's origin tracking if exists. */ + { + snprintf(originname, sizeof(originname), "pg_%u_%u", subid, relid); + originid = replorigin_by_name(originname, true); + if (originid != InvalidRepOriginId) + { + elog(LOG, "!!>> DropSubscription: dropping origin tracking for \"%s\"", originname); + replorigin_drop(originid, false); + elog(LOG, "!!>> DropSubscription: dropped origin tracking for \"%s\"", originname); + } + } + + } + list_free(rstates); + } + + /* Clean up dependencies */ + deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); + + /* Remove any associated relation synchronization states. */ + RemoveSubscriptionRel(subid, InvalidOid); + + /* Remove the origin tracking if exists. */ + snprintf(originname, sizeof(originname), "pg_%u", subid); + originid = replorigin_by_name(originname, true); + if (originid != InvalidRepOriginId) + replorigin_drop(originid, false); + + /* + * If there is a slot associated with the subscription, then drop the + * replication slot at the publisher node using the replication + * connection. + */ + if (slotname) + ReplicationSlotDropAtPubNode(wrconn, slotname); + } + PG_FINALLY(); { + if (wrconn) + walrcv_disconnect(wrconn); + table_close(rel, NoLock); - return; } + PG_END_TRY(); +} + + +/* + * Drop the replication slot at the publisher node + * using the replication connection. + */ +void +ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname) +{ + StringInfoData cmd; + + Assert(wrconn); - /* - * Otherwise drop the replication slot at the publisher node using the - * replication connection. - */ load_file("libpqwalreceiver", false); initStringInfo(&cmd); appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname)); - wrconn = walrcv_connect(conninfo, true, subname, &err); - if (wrconn == NULL) - ereport(ERROR, - (errmsg("could not connect to publisher when attempting to " - "drop the replication slot \"%s\"", slotname), - errdetail("The error was: %s", err), - /* translator: %s is an SQL ALTER command */ - errhint("Use %s to disassociate the subscription from the slot.", - "ALTER SUBSCRIPTION ... SET (slot_name = NONE)"))); - PG_TRY(); { - WalRcvExecResult *res; + WalRcvExecResult *res; res = walrcv_exec(wrconn, cmd.data, 0, NULL); @@ -1103,13 +1215,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } PG_FINALLY(); { - walrcv_disconnect(wrconn); + pfree(cmd.data); } PG_END_TRY(); - - pfree(cmd.data); - - table_close(rel, NoLock); } /* diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 15ab8e7..6b79dc6 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -843,7 +843,7 @@ replorigin_redo(XLogReaderState *record) * that originated at the LSN remote_commit on the remote node was replayed * successfully and that we don't need to do so again. In combination with * setting up replorigin_session_origin_lsn and replorigin_session_origin - * that ensures we won't loose knowledge about that after a crash if the + * that ensures we won't lose knowledge about that after a crash if the * transaction had a persistent effect (think of asynchronous commits). * * local_commit needs to be a local LSN of the commit so that we can make sure @@ -905,7 +905,7 @@ replorigin_advance(RepOriginId node, LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE); /* Make sure it's not used by somebody else */ - if (replication_state->acquired_by != 0) + if (replication_state->acquired_by != 0 && replication_state->acquired_by != MyProcPid) { ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6259606..8180f49 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -43,13 +43,17 @@ * state to SYNCDONE. There might be zero changes applied between * CATCHUP and SYNCDONE, because the sync worker might be ahead of the * apply worker. + * - The sync worker has a intermediary state COPYDONE which comes after + * CATCHUP and before SYNCDONE. This state indicates that the initial + * table copy phase has completed, so if the worker crashes before + * reaching SYNCDONE the copy will not be re-attempted. * - Once the state is set to SYNCDONE, the apply will continue tracking * the table until it reaches the SYNCDONE stream position, at which * point it sets state to READY and stops tracking. Again, there might * be zero changes in between. * * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> - * CATCHUP -> SYNCDONE -> READY. + * CATCHUP -> (sync worker COPYDONE) -> SYNCDONE -> READY. * * The catalog pg_subscription_rel is used to keep information about * subscribed tables and their state. Some transient state during data @@ -64,6 +68,7 @@ * -> set in memory CATCHUP * -> enter wait-loop * sync:10 + * -> set in catalog COPYDONE * -> set in catalog SYNCDONE * -> exit * apply:10 @@ -79,6 +84,7 @@ * -> set in memory CATCHUP * -> continue per-table filtering * sync:10 + * -> set in catalog COPYDONE * -> set in catalog SYNCDONE * -> exit * apply:10 @@ -102,6 +108,8 @@ #include "replication/logicalrelation.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" +#include "replication/slot.h" +#include "replication/origin.h" #include "storage/ipc.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -139,6 +147,33 @@ finish_sync_worker(void) get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); + /* + * Cleanup the tablesync slot. + */ + { + /* Calculate the name of the tablesync slot */ + char *syncslotname = ReplicationSlotNameForTablesync( + MySubscription->oid, + MyLogicalRepWorker->relid); + + PG_TRY(); + { + elog(LOG, "!!>> finish_sync_worker: dropping the tablesync slot \"%s\".", syncslotname); + ReplicationSlotDropAtPubNode(wrconn, syncslotname); + elog(LOG, "!!>> finish_sync_worker: dropped the tablesync slot \"%s\".", syncslotname); + } + PG_CATCH(); + { + /* + * NOP. Suppress any drop slot error because otherwise + * it would cause the tablesync to fail and re-launch. + */ + } + PG_END_TRY(); + + pfree(syncslotname); + } + /* Find the main apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); @@ -270,8 +305,6 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { - Assert(IsTransactionState()); - SpinLockAcquire(&MyLogicalRepWorker->relmutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && @@ -284,6 +317,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) SpinLockRelease(&MyLogicalRepWorker->relmutex); + /* + * UpdateSubscriptionRelState must be called within a transaction. + * That transaction will be ended within the finish_sync_worker(). + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + } + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, @@ -406,12 +448,41 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) { rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; + if (!started_tx) { StartTransactionCommand(); started_tx = true; } + /* + * Remove the tablesync origin tracking if exists. + * + * The cleanup is done here instead of in the finish_sync_worker function because + * if the tablesync worker process attempted to call replorigin_drop then that will + * hang because the replorigin_drop considers the owning tablesync PID as "busy". + * + * Do this before updating the state, so that DropSubscription can know that all + * READY workers have already had their origin tracking removed. + */ + { + char originname[NAMEDATALEN]; + RepOriginId originid; + + snprintf(originname, sizeof(originname), "pg_%u_%u", MyLogicalRepWorker->subid, rstate->relid); + originid = replorigin_by_name(originname, true); + elog(LOG, "!!>> apply worker: find tablesync origin tracking for \"%s\".", originname); + if (OidIsValid(originid)) + { + elog(LOG, "!!>> apply worker: dropping tablesync origin tracking for \"%s\".", originname); + replorigin_drop(originid, false); + elog(LOG, "!!>> apply worker: dropped tablesync origin tracking for \"%s\".", originname); + } + } + + /* + * Update the state only after the origin cleanup. + */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, rstate->lsn); @@ -807,6 +878,32 @@ copy_table(Relation rel) logicalrep_rel_close(relmapentry, NoLock); } + +/* + * Determine the tablesync slot name. + * + * The returned slot name is palloc'ed in current memory context. + */ +char * +ReplicationSlotNameForTablesync(Oid suboid, Oid relid) +{ + char *syncslotname; + + /* + * To build a slot name for the sync work, we are limited to NAMEDATALEN - + * 1 characters. + * + * The name is calculated as pg_%u_sync_%u (3 + 10 + 6 + 10 + '\0'). + * (It's actually the NAMEDATALEN on the remote that matters, but this + * scheme will also work reasonably if that is different.) + */ + StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ + + syncslotname = psprintf("pg_%u_sync_%u", suboid, relid); + + return syncslotname; +} + /* * Start syncing the table in the sync worker. * @@ -849,17 +946,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) finish_sync_worker(); /* doesn't return */ } - /* - * To build a slot name for the sync work, we are limited to NAMEDATALEN - - * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars - * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the - * NAMEDATALEN on the remote that matters, but this scheme will also work - * reasonably if that is different.) - */ - StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ - slotname = psprintf("%.*s_%u_sync_%u", - NAMEDATALEN - 28, - MySubscription->slotname, + /* Calculate the name of the tablesync slot. */ + slotname = ReplicationSlotNameForTablesync( MySubscription->oid, MyLogicalRepWorker->relid); @@ -874,7 +962,19 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) (errmsg("could not connect to the publisher: %s", err))); Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || - MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC); + MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || + MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE); + + if (MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE) + { + /* + * The COPY phase was previously done, but tablesync then crashed/etc + * before it was able to finish normally. + */ + elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_COPYDONE."); + StartTransactionCommand(); + goto copy_table_done; + } SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; @@ -890,9 +990,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommitTransactionCommand(); pgstat_report_stat(false); - /* - * We want to do the table data sync in a single transaction. - */ StartTransactionCommand(); /* @@ -918,29 +1015,98 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) walrcv_clear_result(res); /* - * Create a new temporary logical decoding slot. This slot will be used + * Create a new permanent logical decoding slot. This slot will be used * for the catchup phase after COPY is done, so tell it to use the * snapshot to make the final data consistent. */ - walrcv_create_slot(wrconn, slotname, true, + elog(LOG, "!!>> LogicalRepSyncTableStart: walrcv_create_slot for \"%s\".", slotname); + walrcv_create_slot(wrconn, slotname, false, CRS_USE_SNAPSHOT, origin_startpos); - /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); + /* + * Be sure to remove the newly created tablesync slot if the COPY fails. + */ + PG_TRY(); + { + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); + + res = walrcv_exec(wrconn, "COMMIT", 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errmsg("table copy could not finish transaction on publisher"), + errdetail("The error was: %s", res->err))); + walrcv_clear_result(res); + + table_close(rel, NoLock); + + /* Make the copy visible. */ + CommandCounterIncrement(); + } + PG_CATCH(); + { + /* If something failed during copy table then cleanup the created slot. */ + elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".", slotname); + ReplicationSlotDropAtPubNode(wrconn, slotname); + elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropped the tablesync slot \"%s\".", slotname); - res = walrcv_exec(wrconn, "COMMIT", 0, NULL); - if (res->status != WALRCV_OK_COMMAND) - ereport(ERROR, - (errmsg("table copy could not finish transaction on publisher"), - errdetail("The error was: %s", res->err))); - walrcv_clear_result(res); + pfree(slotname); + slotname = NULL; + + PG_RE_THROW(); + } + PG_END_TRY(); + + /* Update the persisted state to indicate the COPY phase is done; make it visible to others. */ + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_COPYDONE, + MyLogicalRepWorker->relstate_lsn); + +copy_table_done: + + /* Setup replication origin tracking. */ + { + char originname[NAMEDATALEN]; + RepOriginId originid; + + snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + { + /* + * Origin tracking does not exist. Create it now, and advance to LSN got from walrcv_create_slot. + */ + elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_create \"%s\".", originname); + originid = replorigin_create(originname); + elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_session_setup \"%s\".", originname); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_advance \"%s\".", originname); + replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, + true /* go backward */ , true /* WAL log */ ); + } + else + { + /* + * Origin tracking already exists. + */ + elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_setup \"%s\".", originname); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_get_progress \"%s\".", originname); + *origin_startpos = replorigin_session_get_progress(false); + } - table_close(rel, NoLock); + elog(LOG, "!!>> LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X", + originname, + (uint32) (*origin_startpos >> 32), + (uint32) *origin_startpos); + } - /* Make the copy visible. */ - CommandCounterIncrement(); + CommitTransactionCommand(); /* * We are done with the initial data synchronization, update the state. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3874939..d28cfb8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s) /* We must be in a valid transaction state */ Assert(IsTransactionState()); - /* The synchronization worker runs in single transaction. */ - if (!am_tablesync_worker()) - { - /* Commit the per-stream transaction */ - CommitTransactionCommand(); - } + /* Commit the per-stream transaction */ + CommitTransactionCommand(); in_streamed_transaction = false; @@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s) /* Cleanup the subxact info */ cleanup_subxact_info(); - /* The synchronization worker runs in single transaction */ - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); return; } @@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s) /* write the updated subxact list */ subxact_info_write(MyLogicalRepWorker->subid, xid); - if (!am_tablesync_worker()) - CommitTransactionCommand(); + CommitTransactionCommand(); } } @@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data) { - /* The synchronization worker runs in single transaction. */ - if (IsTransactionState() && !am_tablesync_worker()) + if (IsTransactionState()) { /* * Update origin state so we can restart streaming from correct diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index acc2926..e9f2b3f 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -61,6 +61,7 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs #define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */ #define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn * NULL) */ +#define SUBREL_STATE_COPYDONE 'C' /* tablesync copy phase is completed */ #define SUBREL_STATE_SYNCDONE 's' /* synchronization finished in front of * apply (sublsn set) */ #define SUBREL_STATE_READY 'r' /* ready (sublsn set) */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 63bab69..5f19089 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,6 +15,7 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" +#include "replication/walreceiver.h" /* * Behaviour of replication slots, upon release or crash. @@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name); +extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); -- 1.8.3.1