From 67d28785b9d0a4cf10a863cc7d1265d5753a41a0 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 3 Jun 2025 15:31:47 -0700 Subject: [PATCH v1 1/2] Allow to create logical slots with no WAL reservation. Author: Reviewed-by: Discussion: https://postgr.es/m/ --- contrib/test_decoding/expected/slot.out | 32 ++++++++ contrib/test_decoding/sql/slot.sql | 12 +++ src/backend/catalog/system_functions.sql | 1 + src/backend/replication/logical/logical.c | 34 +++++++++ .../replication/logical/logicalfuncs.c | 16 ++-- src/backend/replication/slotfuncs.c | 75 +++++++++++++------ src/backend/replication/walsender.c | 12 +-- src/include/catalog/pg_proc.dat | 8 +- src/include/replication/logical.h | 7 ++ 9 files changed, 156 insertions(+), 41 deletions(-) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 7de03c79f6f..4475260286e 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -466,3 +466,35 @@ SELECT pg_drop_replication_slot('physical_slot'); (1 row) +-- Test logical slots with not WAL reservation. +SELECT 'init' FROM pg_create_logical_replication_slot('slot_nowal', 'test_decoding', false, false, false, false); + ?column? +---------- + init +(1 row) + +-- No WAL reserved. +SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal'; + restart_lsn | confirmed_flush_lsn +-------------+--------------------- + | +(1 row) + +-- Use this slot and check the WAL reservation again. +SELECT data FROM pg_logical_slot_get_changes('slot_nowal', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT restart_lsn is not null as valid_restart_lsn, confirmed_flush_lsn is not null as valid_confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal'; + valid_restart_lsn | valid_confirmed_flush_lsn +-------------------+--------------------------- + t | t +(1 row) + +SELECT pg_drop_replication_slot('slot_nowal'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 580e3ae3bef..3da5d6eab9b 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -190,3 +190,15 @@ SELECT pg_drop_replication_slot('failover_true_slot'); SELECT pg_drop_replication_slot('failover_false_slot'); SELECT pg_drop_replication_slot('failover_default_slot'); SELECT pg_drop_replication_slot('physical_slot'); + +-- Test logical slots with not WAL reservation. +SELECT 'init' FROM pg_create_logical_replication_slot('slot_nowal', 'test_decoding', false, false, false, false); + +-- No WAL reserved. +SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal'; + +-- Use this slot and check the WAL reservation again. +SELECT data FROM pg_logical_slot_get_changes('slot_nowal', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT restart_lsn is not null as valid_restart_lsn, confirmed_flush_lsn is not null as valid_confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal'; + +SELECT pg_drop_replication_slot('slot_nowal'); diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 566f308e443..5a801f5f3bb 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, IN failover boolean DEFAULT false, + IN immediately_reserve boolean DEFAULT true, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1d56d0c4ef3..2a34d2bf846 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -303,6 +303,40 @@ StartupDecodingContext(List *output_plugin_options, return ctx; } +/* + * Create the logical decoding and initialize it if necessary. This function + * can be used for logical slot that might not have been initialized yet. + */ +LogicalDecodingContext * +CreateOrInitDecodingContext(XLogRecPtr restart_lsn, + List *output_plugin_options, + bool fast_foward, + XLogReaderRoutine *xl_routine, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress) +{ + LogicalDecodingContext *ctx; + + /* Initialize the slot with a new logical decoding if not yet */ + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) + { + ctx = CreateInitDecodingContext(NameStr(MyReplicationSlot->data.plugin), + output_plugin_options, false, + restart_lsn, xl_routine, + prepare_write, do_write, update_progress); + + DecodingContextFindStartpoint(ctx); + + FreeDecodingContext(ctx); + } + + ctx = CreateDecodingContext(restart_lsn, output_plugin_options, fast_foward, + xl_routine, prepare_write, do_write, update_progress); + + return ctx; +} + /* * Create a new decoding context, for a new logical slot. * diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index ca53caac2f2..8469280a37b 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -202,14 +202,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin PG_TRY(); { /* restart at slot's confirmed_flush */ - ctx = CreateDecodingContext(InvalidXLogRecPtr, - options, - false, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), - LogicalOutputPrepareWrite, - LogicalOutputWrite, NULL); + ctx = CreateOrInitDecodingContext(InvalidXLogRecPtr, + options, + false, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + LogicalOutputPrepareWrite, + LogicalOutputWrite, NULL); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 36cc2ed4e44..ccbc3732e95 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -116,13 +116,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, - bool failover, + bool failover, bool immediately_reserve, XLogRecPtr restart_lsn, bool find_startpoint) { LogicalDecodingContext *ctx = NULL; Assert(!MyReplicationSlot); + Assert(plugin != NULL); /* * Acquire a logical decoding slot, this will check for conflicting names. @@ -136,30 +137,55 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, failover, false); - /* - * Create logical decoding context to find start point or, if we don't - * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. - * - * Note: when !find_startpoint this is still important, because it's at - * this point that the output plugin is validated. - */ - ctx = CreateInitDecodingContext(plugin, NIL, - false, /* just catalogs is OK */ - restart_lsn, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), - NULL, NULL, NULL); + if (immediately_reserve) + { + /* + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin + * sanity. + * + * Note: when !find_startpoint this is still important, because it's + * at this point that the output plugin is validated. + */ + ctx = CreateInitDecodingContext(plugin, NIL, + false, /* just catalogs is OK */ + restart_lsn, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); - /* - * If caller needs us to determine the decoding start point, do so now. - * This might take a while. - */ - if (find_startpoint) - DecodingContextFindStartpoint(ctx); + /* + * If caller needs us to determine the decoding start point, do so + * now. This might take a while. + */ + if (find_startpoint) + DecodingContextFindStartpoint(ctx); + + /* don't need the decoding context anymore */ + FreeDecodingContext(ctx); + } + else + { + NameData plugin_name; + + /* + * On a standby, this check is also required while creating the slot. + * Check the comments in the function. + */ + CheckLogicalDecodingRequirements(); - /* don't need the decoding context anymore */ - FreeDecodingContext(ctx); + /* + * Register output plugin name with slot. We need the mutex to avoid + * concurrent reading of a partially copied string. But we don't want + * any complicated code while holding a spinlock, so do namestrcpy() + * outside. + */ + namestrcpy(&plugin_name, plugin); + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.plugin = plugin_name; + SpinLockRelease(&MyReplicationSlot->mutex); + } } /* @@ -173,6 +199,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); bool failover = PG_GETARG_BOOL(4); + bool immediately_reserve = PG_GETARG_BOOL(5); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -191,6 +218,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) temporary, two_phase, failover, + immediately_reserve, InvalidXLogRecPtr, true); @@ -726,6 +754,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) temporary, false, false, + true, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9fa8beb6103..85668ef807c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1462,12 +1462,12 @@ StartLogicalReplication(StartReplicationCmd *cmd) * are reported early. */ logical_decoding_ctx = - CreateDecodingContext(cmd->startpoint, cmd->options, false, - XL_ROUTINE(.page_read = logical_read_xlog_page, - .segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), - WalSndPrepareWrite, WalSndWriteData, - WalSndUpdateProgress); + CreateOrInitDecodingContext(cmd->startpoint, cmd->options, false, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), + WalSndPrepareWrite, WalSndWriteData, + WalSndUpdateProgress); xlogreader = logical_decoding_ctx->reader; WalSndSetState(WALSNDSTATE_CATCHUP); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 37a484147a8..62b302fbd1f 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11470,10 +11470,10 @@ { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool bool', - proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', + proargtypes => 'name name bool bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,immediately_reserve,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 2e562bee5a9..481ea53e7e1 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -132,6 +132,13 @@ extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); +extern LogicalDecodingContext *CreateOrInitDecodingContext(XLogRecPtr restart_lsn, + List *output_plugin_options, + bool fast_foward, + XLogReaderRoutine *xl_routine, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress); extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); -- 2.43.5