From af543b5a247e785cb8f4439fc89f979c2b5ec7b2 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Wed, 22 Apr 2020 16:33:07 +0530 Subject: [PATCH v16 11/12] Provide new api to get the streaming changes --- src/backend/catalog/system_views.sql | 8 ++++++++ src/backend/replication/logical/logicalfuncs.c | 23 ++++++++++++++++++----- src/include/catalog/pg_proc.dat | 9 +++++++++ 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 65d650d..d9ab14b 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1242,6 +1242,14 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_get_changes'; +CREATE OR REPLACE FUNCTION pg_logical_slot_get_streaming_changes( + IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', + OUT lsn pg_lsn, OUT xid xid, OUT data text) +RETURNS SETOF RECORD +LANGUAGE INTERNAL +VOLATILE ROWS 1000 COST 1000 +AS 'pg_logical_slot_get_streaming_changes'; + CREATE OR REPLACE FUNCTION pg_logical_slot_peek_changes( IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', OUT lsn pg_lsn, OUT xid xid, OUT data text) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index f5384f1..7561141 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -108,7 +108,8 @@ check_permissions(void) * Helper function for the various SQL callable logical decoding functions. */ static Datum -pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary) +pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, + bool binary, bool streaming) { Name name; XLogRecPtr upto_lsn; @@ -237,6 +238,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin LogicalOutputPrepareWrite, LogicalOutputWrite, NULL); + /* If called has not asked for streaming changes then disable it. */ + ctx->streaming &= streaming; + MemoryContextSwitchTo(oldcontext); /* @@ -347,7 +351,16 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, true, false); + return pg_logical_slot_get_changes_guts(fcinfo, true, false, false); +} + +/* + * SQL function to get the streaming changes as text, consuming the data. + */ +Datum +pg_logical_slot_get_streaming_changes(PG_FUNCTION_ARGS) +{ + return pg_logical_slot_get_changes_guts(fcinfo, true, false, true); } /* @@ -356,7 +369,7 @@ pg_logical_slot_get_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, false, false); + return pg_logical_slot_get_changes_guts(fcinfo, false, false, false); } /* @@ -365,7 +378,7 @@ pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, true, true); + return pg_logical_slot_get_changes_guts(fcinfo, true, true, false); } /* @@ -374,7 +387,7 @@ pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) { - return pg_logical_slot_get_changes_guts(fcinfo, false, true); + return pg_logical_slot_get_changes_guts(fcinfo, false, true, false); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9fb1ffe..3dfc5c1 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10117,6 +10117,15 @@ proargmodes => '{i,i,i,v,o,o,o}', proargnames => '{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}', prosrc => 'pg_logical_slot_get_binary_changes' }, +{ oid => '6150', descr => 'get streaming changes from replication slot', + proname => 'pg_logical_slot_get_streaming_changes', procost => '1000', + prorows => '1000', provariadic => 'text', proisstrict => 'f', + proretset => 't', provolatile => 'v', proparallel => 'u', + prorettype => 'record', proargtypes => 'name pg_lsn int4 _text', + proallargtypes => '{name,pg_lsn,int4,_text,pg_lsn,xid,text}', + proargmodes => '{i,i,i,v,o,o,o}', + proargnames => '{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}', + prosrc => 'pg_logical_slot_get_streaming_changes' }, { oid => '3784', descr => 'peek at changes from replication slot', proname => 'pg_logical_slot_peek_changes', procost => '1000', prorows => '1000', provariadic => 'text', proisstrict => 'f', -- 1.8.3.1