From 52f07aa03ebde429cf3dccbe21bc6fa8e59eacc2 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Tue, 23 Feb 2016 16:04:05 +0800 Subject: [PATCH 4/7] Add the UI and for failover slots Expose failover slots to the user. Add a new 'failover' argument to pg_create_logical_replication_slot and pg_create_physical_replication_slot . Accept a new FAILOVER keyword argument in CREATE_REPLICATION_SLOT on the walsender protocol. --- contrib/test_decoding/expected/ddl.out | 3 +++ contrib/test_decoding/sql/ddl.sql | 2 ++ src/backend/catalog/system_views.sql | 11 ++++++++++- src/backend/replication/repl_gram.y | 13 +++++++++++-- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/slotfuncs.c | 7 +++++-- src/backend/replication/walsender.c | 4 ++-- src/include/catalog/pg_proc.h | 4 ++-- src/include/nodes/replnodes.h | 1 + src/include/replication/slot.h | 1 + 10 files changed, 38 insertions(+), 9 deletions(-) diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 57a1289..5fed500 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -9,6 +9,9 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d -- fail because of an already existing slot SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); ERROR: replication slot "regression_slot" already exists +-- fail because a failover slot can't replace a normal slot on the master +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', true); +ERROR: replication slot "regression_slot" already exists -- fail because of an invalid name SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding'); ERROR: replication slot name "Invalid Name" contains invalid character diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql index e311c59..dc61ef4 100644 --- a/contrib/test_decoding/sql/ddl.sql +++ b/contrib/test_decoding/sql/ddl.sql @@ -4,6 +4,8 @@ SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); -- fail because of an already existing slot SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +-- fail because a failover slot can't replace a normal slot on the master +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', true); -- fail because of an invalid name SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding'); diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index abf9a70..fcb877d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -949,12 +949,21 @@ AS 'pg_logical_slot_peek_binary_changes'; CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( IN slot_name name, IN immediately_reserve boolean DEFAULT false, - OUT slot_name name, OUT xlog_position pg_lsn) + IN failover boolean DEFAULT false, OUT slot_name name, + OUT xlog_position pg_lsn) RETURNS RECORD LANGUAGE INTERNAL STRICT VOLATILE AS 'pg_create_physical_replication_slot'; +CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( + IN slot_name name, IN plugin name, IN failover boolean DEFAULT false, + OUT slot_name text, OUT xlog_position pg_lsn) +RETURNS RECORD +LANGUAGE INTERNAL +STRICT VOLATILE +AS 'pg_create_logical_replication_slot'; + CREATE OR REPLACE FUNCTION make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0, days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0, diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index d93db88..1574f24 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -77,6 +77,7 @@ Node *replication_parse_result; %token K_LOGICAL %token K_SLOT %token K_RESERVE_WAL +%token K_FAILOVER %type command %type base_backup start_replication start_logical_replication @@ -90,6 +91,7 @@ Node *replication_parse_result; %type plugin_opt_arg %type opt_slot %type opt_reserve_wal +%type opt_failover %% @@ -184,23 +186,25 @@ base_backup_opt: create_replication_slot: /* CREATE_REPLICATION_SLOT slot PHYSICAL RESERVE_WAL */ - K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal + K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL opt_reserve_wal opt_failover { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_PHYSICAL; cmd->slotname = $2; cmd->reserve_wal = $4; + cmd->failover = $5; $$ = (Node *) cmd; } /* CREATE_REPLICATION_SLOT slot LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT + | K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT opt_failover { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_LOGICAL; cmd->slotname = $2; cmd->plugin = $4; + cmd->failover = $5; $$ = (Node *) cmd; } ; @@ -276,6 +280,11 @@ opt_reserve_wal: | /* EMPTY */ { $$ = false; } ; +opt_failover: + K_FAILOVER { $$ = true; } + | /* EMPTY */ { $$ = false; } + ; + opt_slot: K_SLOT IDENT { $$ = $2; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index f83ec53..a1d9f10 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -98,6 +98,7 @@ PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } +FAILOVER { return K_FAILOVER; } "," { return ','; } ";" { return ';'; } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index f430714..a2dfc40 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -18,6 +18,7 @@ #include "access/htup_details.h" #include "replication/slot.h" +#include "replication/slot_xlog.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" #include "utils/builtins.h" @@ -41,6 +42,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); bool immediately_reserve = PG_GETARG_BOOL(1); + bool failover = PG_GETARG_BOOL(2); Datum values[2]; bool nulls[2]; TupleDesc tupdesc; @@ -57,7 +59,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); /* acquire replication slot, this will check for conflicting names */ - ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, false); + ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT, failover); values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; @@ -96,6 +98,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); Name plugin = PG_GETARG_NAME(1); + bool failover = PG_GETARG_BOOL(2); LogicalDecodingContext *ctx = NULL; @@ -120,7 +123,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) * errors during initialization because it'll get dropped if this * transaction fails. We'll make it persistent at the end. */ - ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, false); + ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL, failover); /* * Create logical decoding context, to build the initial snapshot. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1583862..efdbfd1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -792,7 +792,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (cmd->kind == REPLICATION_KIND_PHYSICAL) { - ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, false); + ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT, cmd->failover); } else { @@ -803,7 +803,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) * handle errors during initialization because it'll get dropped if * this transaction fails. We'll make it persistent at the end. */ - ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, false); + ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL, cmd->failover); } initStringInfo(&output_message); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index aec6c4c..e7247af 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5077,13 +5077,13 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 DESCR("SP-GiST support for quad tree over range"); /* replication slots */ -DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,failover,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); -DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,failover,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); DESCR("set up a logical replication slot"); DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ )); DESCR("get changes from replication slot"); diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index d2f1edb..a8fa9d5 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd ReplicationKind kind; char *plugin; bool reserve_wal; + bool failover; } CreateReplicationSlotCmd; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index cdcbd37..9e23a29 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -4,6 +4,7 @@ * * Copyright (c) 2012-2016, PostgreSQL Global Development Group * + * src/include/replication/slot.h *------------------------------------------------------------------------- */ #ifndef SLOT_H -- 2.1.0