From 62fa846893211523762e4b22f4e8abda29b49ee2 Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Fri, 25 Apr 2025 12:36:10 +0530 Subject: [PATCH v1] Fix duplicate insert during pg_createsubscriber When pg_createsubscriber is run, the standby node is recovered to a point 'consistent_lsn' and promoted. Then, when the subscription is created, the replication origin is advanced to the 'consistent_lsn'. Then the subscription is enabled and the apply worker starts to send changes from 'consistent_lsn'. When this 'consistent_lsn' is an LSN corresponding to a COMMIT, the records for the transaction for that COMMIT are already replicated to the standby node during the recovery phase. Now, when the subscription is created, the replication origin is advanced and the subscription is enabled. The apply worker starts to apply changes from 'consistent_lsn'. So, records corresponding to the transaction whose COMMIT LSN is 'consistent_lsn' are replicated again. To avoid the above scenario, the replication origin should be advanced to the next LSN of 'consistent_lsn'. This patch introduces a new API, which gives the next LSN of a given LSN. In pg_createsubscriber, we use this API and advance the replication origin to the next LSN of 'consistent_lsn'. --- src/backend/access/transam/xlogfuncs.c | 40 ++++++ src/backend/access/transam/xlogutils.c | 133 ++++++++++++++++++++ src/bin/pg_basebackup/pg_createsubscriber.c | 21 +++- src/include/access/xlogutils.h | 8 ++ src/include/catalog/pg_proc.dat | 4 + 5 files changed, 205 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index 8c3090165f0..5492e956a19 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -22,6 +22,7 @@ #include "access/xlog_internal.h" #include "access/xlogbackup.h" #include "access/xlogrecovery.h" +#include "access/xlogutils.h" #include "catalog/pg_type.h" #include "funcapi.h" #include "miscadmin.h" @@ -748,3 +749,42 @@ pg_promote(PG_FUNCTION_ARGS) wait_seconds))); PG_RETURN_BOOL(false); } + +/* + * Get next LSN. + */ +Datum +pg_get_next_lsn(PG_FUNCTION_ARGS) +{ + XLogRecPtr result; + XLogRecPtr lsn; + XLogRecPtr curr_lsn; + XLogReaderState *xlogreader; + TupleDesc tupdesc; + HeapTuple tuple; + + lsn = PG_GETARG_LSN(0); + curr_lsn = GetCurrentLSN(); + + if (lsn > curr_lsn) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("WAL input LSN must be less than current LSN"), + errdetail("Current WAL LSN on the database system is at %X/%X.", + LSN_FORMAT_ARGS(curr_lsn)))); + + xlogreader = InitXLogReaderState(lsn); + + if (!ReadNextXLogRecord(xlogreader)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not read WAL at %X/%X", + LSN_FORMAT_ARGS(xlogreader->EndRecPtr)))); + + result = GetEndLSN(xlogreader); + + pfree(xlogreader->private_data); + XLogReaderFree(xlogreader); + + PG_RETURN_DATUM(LSNGetDatum(result)); +} diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index c389b27f77d..4b139aee54f 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -1032,3 +1032,136 @@ WALReadRaiseError(WALReadError *errinfo) errinfo->wre_req))); } } + +/* + * Read next WAL record. + * + * By design, to be less intrusive in a running system, no slot is allocated + * to reserve the WAL we're about to read. Therefore this function can + * encounter read errors for historical WAL. + * + * We guard against ordinary errors trying to read WAL that hasn't been + * written yet by limiting end_lsn to the flushed WAL, but that can also + * encounter errors if the flush pointer falls in the middle of a record. In + * that case we'll return NULL. + */ +XLogRecord * +ReadNextXLogRecord(XLogReaderState *xlogreader) +{ + XLogRecord *record; + char *errormsg; + + record = XLogReadRecord(xlogreader, &errormsg); + + if (record == NULL) + { + ReadLocalXLogPageNoWaitPrivate *private_data; + + /* return NULL, if end of WAL is reached */ + private_data = (ReadLocalXLogPageNoWaitPrivate *) + xlogreader->private_data; + + if (private_data->end_of_wal) + return NULL; + + if (errormsg) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read WAL at %X/%X: %s", + LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read WAL at %X/%X", + LSN_FORMAT_ARGS(xlogreader->EndRecPtr)))); + } + + return record; +} + +/* + * Return the LSN up to which the server has WAL. + */ +XLogRecPtr +GetCurrentLSN(void) +{ + XLogRecPtr curr_lsn; + + /* + * We determine the current LSN of the server similar to how page_read + * callback read_local_xlog_page_no_wait does. + */ + if (!RecoveryInProgress()) + curr_lsn = GetFlushRecPtr(NULL); + else + curr_lsn = GetXLogReplayRecPtr(NULL); + + Assert(!XLogRecPtrIsInvalid(curr_lsn)); + + return curr_lsn; +} + +/* + * Initialize WAL reader and identify first valid LSN. + */ +XLogReaderState * +InitXLogReaderState(XLogRecPtr lsn) +{ + XLogReaderState *xlogreader; + ReadLocalXLogPageNoWaitPrivate *private_data; + XLogRecPtr first_valid_record; + + /* + * Reading WAL below the first page of the first segments isn't allowed. + * This is a bootstrap WAL page and the page_read callback fails to read + * it. + */ + if (lsn < XLOG_BLCKSZ) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not read WAL at LSN %X/%X", + LSN_FORMAT_ARGS(lsn)))); + + private_data = (ReadLocalXLogPageNoWaitPrivate *) + palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate)); + + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + private_data); + + if (xlogreader == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + + /* first find a valid recptr to start from */ + first_valid_record = XLogFindNextRecord(xlogreader, lsn); + + if (XLogRecPtrIsInvalid(first_valid_record)) + ereport(ERROR, + (errmsg("could not find a valid record after %X/%X", + LSN_FORMAT_ARGS(lsn)))); + + return xlogreader; +} + +/* + * Get End LSN for a record + */ +XLogRecPtr +GetEndLSN(XLogReaderState *record) +{ + const char *record_type; + RmgrData desc; + + desc = GetRmgr(XLogRecGetRmid(record)); + record_type = desc.rm_identify(XLogRecGetInfo(record)); + + if (record_type == NULL) + record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK); + + return record->EndRecPtr; +} diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index f65acc7cb11..9ff140a34f0 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -1846,6 +1846,7 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons char *dbname; char *originname; char *lsnstr; + char *nextlsn; Assert(conn != NULL); @@ -1886,6 +1887,24 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons PQclear(res); + resetPQExpBuffer(str); + appendPQExpBuffer(str, "SELECT pg_catalog.pg_get_next_lsn('%s')", lsnstr); + + if (!dry_run) + { + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain the next lsn of consistent lsn: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + nextlsn = pg_strdup(PQgetvalue(res, 0, 0)); + PQclear(res); + } + else + nextlsn = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr)); + /* * The origin name is defined as pg_%u. %u is the subscription OID. See * ApplyWorkerMain(). @@ -1898,7 +1917,7 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons resetPQExpBuffer(str); appendPQExpBuffer(str, "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')", - originname, lsnstr); + originname, nextlsn); pg_log_debug("command is: %s", str->data); diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index a1870d8e5aa..7ebe5ea251d 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -118,4 +118,12 @@ extern void XLogReadDetermineTimeline(XLogReaderState *state, extern void WALReadRaiseError(WALReadError *errinfo); +extern XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader); + +extern XLogRecPtr GetCurrentLSN(void); + +extern XLogReaderState *InitXLogReaderState(XLogRecPtr lsn); + +extern XLogRecPtr GetEndLSN(XLogReaderState *record); + #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 62beb71da28..6cc971b2855 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12566,4 +12566,8 @@ proargnames => '{pid,io_id,io_generation,state,operation,off,length,target,handle_data_len,raw_result,result,target_desc,f_sync,f_localmem,f_buffered}', prosrc => 'pg_get_aios' }, +{ oid => '3063', descr => 'next lsn value', + proname => 'pg_get_next_lsn', prorettype => 'pg_lsn', + proargtypes => 'pg_lsn', prosrc => 'pg_get_next_lsn' }, + ] -- 2.34.1