From faee83481f2e04d5ac5a62657d2eadecb7205247 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 20 Mar 2024 22:38:24 +0000 Subject: [PATCH v13 4/6] Allow setting inactive_timeout in the replication command --- doc/src/sgml/protocol.sgml | 20 ++++++ src/backend/commands/subscriptioncmds.c | 6 +- .../libpqwalreceiver/libpqwalreceiver.c | 61 ++++++++++++++++--- src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/slot.c | 30 ++++++++- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 38 +++++++++--- src/include/replication/slot.h | 3 +- src/include/replication/walreceiver.h | 11 ++-- src/test/recovery/t/001_stream_rep.pl | 50 +++++++++++++++ 10 files changed, 195 insertions(+), 27 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index a5cb19357f..2ffa1b470a 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2068,6 +2068,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + INACTIVE_TIMEOUT [ integer ] + + + If set to a non-zero value, specifies the amount of time in seconds + the slot is allowed to be inactive. The default is zero. + + + @@ -2168,6 +2178,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + INACTIVE_TIMEOUT [ integer ] + + + If set to a non-zero value, specifies the amount of time in seconds + the slot is allowed to be inactive. The default is zero. + + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5a47fa984d..4562de49c4 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -827,7 +827,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL); + opts.failover, 0, CRS_NOEXPORT_SNAPSHOT, NULL); if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); @@ -849,7 +849,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, else if (opts.slot_name && (opts.failover || walrcv_server_version(wrconn) >= 170000)) { - walrcv_alter_slot(wrconn, opts.slot_name, opts.failover); + walrcv_alter_slot(wrconn, opts.slot_name, &opts.failover, NULL); } } PG_FINALLY(); @@ -1541,7 +1541,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.failover); + walrcv_alter_slot(wrconn, sub->slotname, &opts.failover, NULL); } PG_FINALLY(); { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 761bf0f677..126250a076 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -77,10 +77,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, bool temporary, bool two_phase, bool failover, + int inactive_timeout, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover); + bool *failover, int *inactive_timeout); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -1008,7 +1009,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, bool two_phase, bool failover, + bool temporary, bool two_phase, + bool failover, int inactive_timeout, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; @@ -1048,6 +1050,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoChar(&cmd, ' '); } + if (inactive_timeout > 0) + { + appendStringInfo(&cmd, "INACTIVE_TIMEOUT %d", inactive_timeout); + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); + } + if (use_new_options_syntax) { switch (snapshot_action) @@ -1084,10 +1095,24 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, } else { + appendStringInfoString(&cmd, " PHYSICAL "); if (use_new_options_syntax) - appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)"); - else - appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); + appendStringInfoChar(&cmd, '('); + + appendStringInfoString(&cmd, "RESERVE_WAL"); + + if (inactive_timeout > 0) + { + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); + + appendStringInfo(&cmd, "INACTIVE_TIMEOUT %d", inactive_timeout); + } + + if (use_new_options_syntax) + appendStringInfoChar(&cmd, ')'); } res = libpqrcv_PQexec(conn->streamConn, cmd.data); @@ -1121,15 +1146,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover) + bool *failover, int *inactive_timeout) { StringInfoData cmd; PGresult *res; + bool specified_prev_opt = false; initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )", - quote_identifier(slotname), - failover ? "true" : "false"); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s (", + quote_identifier(slotname)); + + if (failover != NULL) + { + appendStringInfo(&cmd, "FAILOVER %s", + *failover ? "true" : "false"); + specified_prev_opt = true; + } + + if (inactive_timeout != NULL) + { + if (specified_prev_opt) + appendStringInfoString(&cmd, ", "); + + appendStringInfo(&cmd, "INACTIVE_TIMEOUT %d", *inactive_timeout); + specified_prev_opt = true; + } + + appendStringInfoChar(&cmd, ')'); res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1061d5b61b..59f8e5fbaa 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1431,6 +1431,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , MySubscription->failover, + 0, CRS_USE_SNAPSHOT, origin_startpos); /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 195771920f..aba5e981d7 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -808,8 +808,10 @@ ReplicationSlotDrop(const char *name, bool nowait) * Change the definition of the slot identified by the specified name. */ void -ReplicationSlotAlter(const char *name, bool failover) +ReplicationSlotAlter(const char *name, bool failover, int inactive_timeout) { + bool lock_acquired; + Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name, false); @@ -852,10 +854,36 @@ ReplicationSlotAlter(const char *name, bool failover) errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot enable failover for a temporary replication slot")); + /* + * Do not allow users to set inactive_timeout for temporary slots because + * temporary, slots will not be saved to the disk. + */ + if (inactive_timeout > 0 && MyReplicationSlot->data.persistency == RS_TEMPORARY) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot set inactive_timeout for a temporary replication slot")); + + lock_acquired = false; if (MyReplicationSlot->data.failover != failover) { SpinLockAcquire(&MyReplicationSlot->mutex); + lock_acquired = true; MyReplicationSlot->data.failover = failover; + } + + if (MyReplicationSlot->data.inactive_timeout != inactive_timeout) + { + if (!lock_acquired) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + lock_acquired = true; + } + + MyReplicationSlot->data.inactive_timeout = inactive_timeout; + } + + if (lock_acquired) + { SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotMarkDirty(); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index acda5f68d9..ac2ebb0c69 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -389,7 +389,7 @@ WalReceiverMain(char *startup_data, size_t startup_data_len) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, false, 0, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5315c08650..0420274247 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1123,13 +1123,15 @@ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, - bool *two_phase, bool *failover) + bool *two_phase, bool *failover, + int *inactive_timeout) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; bool two_phase_given = false; bool failover_given = false; + bool inactive_timeout_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -1188,6 +1190,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, failover_given = true; *failover = defGetBoolean(defel); } + else if (strcmp(defel->defname, "inactive_timeout") == 0) + { + if (inactive_timeout_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + inactive_timeout_given = true; + *inactive_timeout = defGetInt32(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1205,6 +1216,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) bool reserve_wal = false; bool two_phase = false; bool failover = false; + int inactive_timeout = 0; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -1215,13 +1227,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, - &failover); + &failover, &inactive_timeout); if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false, false, 0); + false, false, false, inactive_timeout); if (reserve_wal) { @@ -1252,7 +1264,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover, false, 0); + two_phase, failover, false, inactive_timeout); /* * Do options check early so that we can bail before calling the @@ -1411,9 +1423,11 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) * Process extra options given to ALTER_REPLICATION_SLOT. */ static void -ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) +ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover, + int *inactive_timeout) { bool failover_given = false; + bool inactive_timeout_given = false; /* Parse options */ foreach_ptr(DefElem, defel, cmd->options) @@ -1427,6 +1441,15 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) failover_given = true; *failover = defGetBoolean(defel); } + else if (strcmp(defel->defname, "inactive_timeout") == 0) + { + if (inactive_timeout_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + inactive_timeout_given = true; + *inactive_timeout = defGetInt32(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1439,9 +1462,10 @@ static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd) { bool failover = false; + int inactive_timeout = 0; - ParseAlterReplSlotOptions(cmd, &failover); - ReplicationSlotAlter(cmd->slotname, failover); + ParseAlterReplSlotOptions(cmd, &failover, &inactive_timeout); + ReplicationSlotAlter(cmd->slotname, failover, inactive_timeout); } /* diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index ff62542b03..77def17386 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -246,7 +246,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotDropAcquired(void); -extern void ReplicationSlotAlter(const char *name, bool failover); +extern void ReplicationSlotAlter(const char *name, bool failover, + int inactive_timeout); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 12f71fa99b..038812fd24 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -366,6 +366,7 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, bool temporary, bool two_phase, bool failover, + int inactive_timeout, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); @@ -377,7 +378,7 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, */ typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, const char *slotname, - bool failover); + bool *failover, int *inactive_timeout); /* * walrcv_get_backend_pid_fn @@ -453,10 +454,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) -#define walrcv_alter_slot(conn, slotname, failover) \ - WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, inactive_timeout, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, inactive_timeout, snapshot_action, lsn) +#define walrcv_alter_slot(conn, slotname, failover, inactive_timeout) \ + WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, inactive_timeout) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index 5311ade509..db00b6aa24 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -604,4 +604,54 @@ ok( pump_until( 'base backup cleanly canceled'); $sigchld_bb->finish(); +# Drop any existing slots on the primary, for the follow-up tests. +$node_primary->safe_psql('postgres', + "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;"); + +# Test setting inactive_timeout option via replication commands. +$node_primary->append_conf( + 'postgresql.conf', qq( +wal_level = logical +)); +$node_primary->restart; + +$node_primary->psql( + 'postgres', + "CREATE_REPLICATION_SLOT it_phy_slot1 PHYSICAL (RESERVE_WAL, INACTIVE_TIMEOUT 100);", + extra_params => [ '-d', $connstr_db ]); + +$node_primary->psql( + 'postgres', + "CREATE_REPLICATION_SLOT it_phy_slot2 PHYSICAL (RESERVE_WAL);", + extra_params => [ '-d', $connstr_db ]); + +$node_primary->psql( + 'postgres', + "ALTER_REPLICATION_SLOT it_phy_slot2 (INACTIVE_TIMEOUT 200);", + extra_params => [ '-d', $connstr_db ]); + +$node_primary->psql( + 'postgres', + "CREATE_REPLICATION_SLOT it_log_slot1 LOGICAL pgoutput (TWO_PHASE, INACTIVE_TIMEOUT 300);", + extra_params => [ '-d', $connstr_db ]); + +$node_primary->psql( + 'postgres', + "CREATE_REPLICATION_SLOT it_log_slot2 LOGICAL pgoutput;", + extra_params => [ '-d', $connstr_db ]); + +$node_primary->psql( + 'postgres', + "ALTER_REPLICATION_SLOT it_log_slot2 (INACTIVE_TIMEOUT 400);", + extra_params => [ '-d', $connstr_db ]); + +my $slot_info_expected = 'it_log_slot1|logical|300 +it_log_slot2|logical|400 +it_phy_slot1|physical|100 +it_phy_slot2|physical|0'; + +my $slot_info = $node_primary->safe_psql('postgres', + qq[SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;]); +is($slot_info, $slot_info_expected, "replication slots with inactive_timeout on primary exist"); + done_testing(); -- 2.34.1