Stream consistent snapshot via a logical decoding plugin as a series of INSERTs - Mailing list pgsql-hackers

From Shulgin, Oleksandr
Subject Stream consistent snapshot via a logical decoding plugin as a series of INSERTs
Date
Msg-id CACACo5RNZ0OB8KwhuwF4F_xumfgbdXbw1sxdqxmyJ_gCobn=iA@mail.gmail.com
Whole thread Raw
Responses Re: Stream consistent snapshot via a logical decoding plugin as a series of INSERTs
Re: Stream consistent snapshot via a logical decoding plugin as a series of INSERTs
List pgsql-hackers
Hello,

I'd like to propose generic functions (probably in an extension, or in core if not possible otherwise) to facilitate streaming existing data from the database *in the same format* that one would get if these would be the changes decoded by a logical decoding plugin.

The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT command of the replication protocol to get a consistent snapshot of the database, then start listening to new changes on the slot.

One of the implementations of this approach is "Bottled Water": https://github.com/confluentinc/bottledwater-pg

The way this initial export phase is implemented there is by providing a SQL-callable set returning function which is using SPI to run "SELECT * FROM mytable" behind the scenes and runs the resulting tuples through the INSERT callback of the logical decoding plugin, which lives in the same loadable module as this SQL function.

Bottled Water logical decoding plugin uses binary protocol based on Avro data serialization library.  As an experiment I was adding support for JSON output format to it, and for that I had to re-implement the aforementioned SRF to export initial data to convert tuples to JSON instead.

Now I'd like to compare performance impact of using JSON vs. Avro vs. binary format of pglogical_output and for that a missing part is something that would stream the existing data in pglogical's format.  Instead of writing one more implementation of the export function, this time for pglogical_output, I'd rather use a generic function that accepts a relation name, logical decoding plugin name and a set of startup options for the plugin, then pretends that we're decoding a stream of INSERTs on a slot (no actual slot is needed for that, but setting transaction snapshot beforehand is something to be done by the client).

In SQL and C pseudo-code:

CREATE FUNCTION /*pg_catalog.?*/ pg_logical_stream_relation(
    relnamespace text,
    relname text,
    plugin_name text,
    nochildren boolean DEFAULT FALSE,
    VARIADIC options text[] DEFAULT '{}'::text[]
) RETURNS SETOF text
AS '...', 'pg_logical_stream_relation' LANGUAGE C VOLATILE;

CREATE FUNCTION /*pg_catalog.?*/ pg_logical_stream_relation_binary(
    relnamespace text,
    relname text,
    plugin_name text,
    nochildren boolean DEFAULT FALSE,
    VARIADIC options text[] DEFAULT '{}'::text[]
) RETURNS SETOF bytea
AS '...', 'pg_logical_stream_relation_binary' LANGUAGE C VOLATILE;

-- usage:
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION SNAPSHOT 'XXXXXXXX-N';

SELECT *
  FROM pg_logical_stream_relation('myschema', 'mytable', 'test_decoding',
                                  nochildren := FALSE, ...)


Datum
pg_logical_stream_relation(PG_FUNCTION_ARGS)
{
    if (SRF_IS_FIRSTCALL())
    {
        /* create decoding context */ /* starts the plugin up */
        /* emit BEGIN */
    }
    /*
      seq scan
      => emit series of INSERTs
    */
    /* emit COMMIT */
    /* free decoding context */ /* shuts the plugin down */
}

What do you say?

--
Alex

pgsql-hackers by date:

Previous
From: Abhijit Menon-Sen
Date:
Subject: dealing with extension dependencies that aren't quite 'e'
Next
From: Magnus Hagander
Date:
Subject: Re: Comment typo in port/atomics/generic.h