From 58ba77638796cba0712e63c4a2d0a6762fd2081c Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Tue, 25 Jul 2023 16:22:39 +0200 Subject: [PATCH 7/7] Catchup up to a LSN after copy of the sequence --- src/backend/replication/logical/tablesync.c | 51 +++++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index cbd1c6426c6..694ce70b7d8 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -118,6 +118,7 @@ #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/rls.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -1250,12 +1251,52 @@ fetch_sequence_data(char *nspname, char *relname) return value; } +/* + * Fetch remote insert LSN from the remote node. + */ +static XLogRecPtr +fetch_remote_lsn(void) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[1] = {LSNOID}; + XLogRecPtr value = InvalidXLogRecPtr; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT pg_current_wal_lsn()"); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive current LSN from the publisher: %s", + res->err))); + + /* Process the sequence. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + + value = DatumGetLSN(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + return value; +} + /* * Copy existing data of a sequence from publisher. * * Caller is responsible for locking the local relation. */ -static void +static XLogRecPtr copy_sequence(Relation rel) { LogicalRepRelMapEntry *relmapentry; @@ -1299,6 +1340,9 @@ copy_sequence(Relation rel) SetSequence(RelationGetRelid(rel), false, sequence_value); logicalrep_rel_close(relmapentry, NoLock); + + /* also fetch current remote LSN (after the data was selected) */ + return fetch_remote_lsn(); } /* @@ -1349,6 +1393,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UserContext ucxt; bool must_use_password; bool run_as_owner; + XLogRecPtr remote_lsn = InvalidXLogRecPtr; /* Check the state of the table synchronization. */ StartTransactionCommand(); @@ -1567,7 +1612,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { /* Now do the initial sequence copy */ PushActiveSnapshot(GetTransactionSnapshot()); - copy_sequence(rel); + remote_lsn = copy_sequence(rel); PopActiveSnapshot(); } else @@ -1616,7 +1661,7 @@ copy_table_done: */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; - MyLogicalRepWorker->relstate_lsn = *origin_startpos; + MyLogicalRepWorker->relstate_lsn = Max(*origin_startpos, remote_lsn); SpinLockRelease(&MyLogicalRepWorker->relmutex); /* -- 2.41.0