Thread: Stream consistent snapshot via a logical decoding plugin as a series of INSERTs

Stream consistent snapshot via a logical decoding plugin as a series of INSERTs

From
"Shulgin, Oleksandr"
Date:
Hello,

I'd like to propose generic functions (probably in an extension, or in core if not possible otherwise) to facilitate streaming existing data from the database *in the same format* that one would get if these would be the changes decoded by a logical decoding plugin.

The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT command of the replication protocol to get a consistent snapshot of the database, then start listening to new changes on the slot.

One of the implementations of this approach is "Bottled Water": https://github.com/confluentinc/bottledwater-pg

The way this initial export phase is implemented there is by providing a SQL-callable set returning function which is using SPI to run "SELECT * FROM mytable" behind the scenes and runs the resulting tuples through the INSERT callback of the logical decoding plugin, which lives in the same loadable module as this SQL function.

Bottled Water logical decoding plugin uses binary protocol based on Avro data serialization library.  As an experiment I was adding support for JSON output format to it, and for that I had to re-implement the aforementioned SRF to export initial data to convert tuples to JSON instead.

Now I'd like to compare performance impact of using JSON vs. Avro vs. binary format of pglogical_output and for that a missing part is something that would stream the existing data in pglogical's format.  Instead of writing one more implementation of the export function, this time for pglogical_output, I'd rather use a generic function that accepts a relation name, logical decoding plugin name and a set of startup options for the plugin, then pretends that we're decoding a stream of INSERTs on a slot (no actual slot is needed for that, but setting transaction snapshot beforehand is something to be done by the client).

In SQL and C pseudo-code:

CREATE FUNCTION /*pg_catalog.?*/ pg_logical_stream_relation(
    relnamespace text,
    relname text,
    plugin_name text,
    nochildren boolean DEFAULT FALSE,
    VARIADIC options text[] DEFAULT '{}'::text[]
) RETURNS SETOF text
AS '...', 'pg_logical_stream_relation' LANGUAGE C VOLATILE;

CREATE FUNCTION /*pg_catalog.?*/ pg_logical_stream_relation_binary(
    relnamespace text,
    relname text,
    plugin_name text,
    nochildren boolean DEFAULT FALSE,
    VARIADIC options text[] DEFAULT '{}'::text[]
) RETURNS SETOF bytea
AS '...', 'pg_logical_stream_relation_binary' LANGUAGE C VOLATILE;

-- usage:
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION SNAPSHOT 'XXXXXXXX-N';

SELECT *
  FROM pg_logical_stream_relation('myschema', 'mytable', 'test_decoding',
                                  nochildren := FALSE, ...)


Datum
pg_logical_stream_relation(PG_FUNCTION_ARGS)
{
    if (SRF_IS_FIRSTCALL())
    {
        /* create decoding context */ /* starts the plugin up */
        /* emit BEGIN */
    }
    /*
      seq scan
      => emit series of INSERTs
    */
    /* emit COMMIT */
    /* free decoding context */ /* shuts the plugin down */
}

What do you say?

--
Alex

On 15 January 2016 at 08:30, Shulgin, Oleksandr <oleksandr.shulgin@zalando.de> wrote:
 
I'd like to propose generic functions (probably in an extension, or in core if not possible otherwise) to facilitate streaming existing data from the database *in the same format* that one would get if these would be the changes decoded by a logical decoding plugin.

The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT command of the replication protocol to get a consistent snapshot of the database, then start listening to new changes on the slot.

It sounds like this is already possible. 

--
Simon Riggs                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Re: Stream consistent snapshot via a logical decoding plugin as a series of INSERTs

From
"Shulgin, Oleksandr"
Date:
On Fri, Jan 15, 2016 at 11:08 AM, Simon Riggs <simon@2ndquadrant.com> wrote:
On 15 January 2016 at 08:30, Shulgin, Oleksandr <oleksandr.shulgin@zalando.de> wrote:
 
I'd like to propose generic functions (probably in an extension, or in core if not possible otherwise) to facilitate streaming existing data from the database *in the same format* that one would get if these would be the changes decoded by a logical decoding plugin.

The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT command of the replication protocol to get a consistent snapshot of the database, then start listening to new changes on the slot.

It sounds like this is already possible. 

Totally, that's how it was supposed to be used anyway.  What is missing IMO is retrieving the initial snapshot in the same format as that the later changes will arrive.

--
Alex

Re: Stream consistent snapshot via a logical decoding plugin as a series of INSERTs

From
"Shulgin, Oleksandr"
Date:
On Fri, Jan 15, 2016 at 12:09 PM, Shulgin, Oleksandr <oleksandr.shulgin@zalando.de> wrote:
On Fri, Jan 15, 2016 at 11:08 AM, Simon Riggs <simon@2ndquadrant.com> wrote:
On 15 January 2016 at 08:30, Shulgin, Oleksandr <oleksandr.shulgin@zalando.de> wrote:
 
I'd like to propose generic functions (probably in an extension, or in core if not possible otherwise) to facilitate streaming existing data from the database *in the same format* that one would get if these would be the changes decoded by a logical decoding plugin.

The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT command of the replication protocol to get a consistent snapshot of the database, then start listening to new changes on the slot.

It sounds like this is already possible. 

Totally, that's how it was supposed to be used anyway.  What is missing IMO is retrieving the initial snapshot in the same format as that the later changes will arrive.

POC patch attached.  Findings:

1) Needs an actual slot for all the decode machinery to work (code depends on MyReplicationSlot being set).
2) Requires a core patch.
3) Currently only supports textual output, adding binary is trivial.


Acquiring a slot means this cannot be run in parallel from multiple backends.  Any ideas on how to overcome this (except for opening multiple slots with the same LSN)?
To obtain a consistent snapshot, the client still needs to take care of preserving and setting transaction snapshot properly.

--
Alex

Attachment

Re: Stream consistent snapshot via a logical decoding plugin as a series of INSERTs

From
"Shulgin, Oleksandr"
Date:
On Fri, Jan 15, 2016 at 5:31 PM, Shulgin, Oleksandr <oleksandr.shulgin@zalando.de> wrote:

POC patch attached.  Findings:

1) Needs an actual slot for all the decode machinery to work (code depends on MyReplicationSlot being set).
2) Requires a core patch.
3) Currently only supports textual output, adding binary is trivial.


Acquiring a slot means this cannot be run in parallel from multiple backends.  Any ideas on how to overcome this (except for opening multiple slots with the same LSN)?
To obtain a consistent snapshot, the client still needs to take care of preserving and setting transaction snapshot properly.

Testing revealed a number of problems with memory handling in this code, a corrected v2 is attached.

Completely another problem is proper handling of SPI stack and releasing the replication slot.  The latter I'd like to avoid dealing with, because at the moment it's not possible to stream a number of relations in parallel using this POC function, so I'd rather move in a direction of not acquiring the replication slot at all.

The SPI problem manifests itself if I place a LIMIT on top of the query:

# SELECT pg_logical_slot_stream_relation('slot1', 'pg_catalog', 'pg_class') LIMIT 5;
WARNING:  relcache reference leak: relation "pg_class" not closed
WARNING:  transaction left non-empty SPI stack
HINT:  Check for missing "SPI_finish" calls.

I wonder if there is a way to install some sort of cleanup handler that will be called by executor?

--
Alex

Attachment
On 15 January 2016 at 16:30, Shulgin, Oleksandr <oleksandr.shulgin@zalando.de> wrote:
 
I'd like to propose generic functions (probably in an extension, or in core if not possible otherwise) to facilitate streaming existing data from the database *in the same format* that one would get if these would be the changes decoded by a logical decoding plugin.

So effectively produce synthetic logical decoding callbacks to run a bunch of fake INSERTs, presumably with a fake xid etc?
 
The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT command of the replication protocol to get a consistent snapshot of the database, then start listening to new changes on the slot.

My impression is that you want to avoid the current step of "synchronize database initial contents" when using logical decoding for replication. But I guess you're looking to then populate that empty schema in-band via logical decoding, rather than having to do a --data-only dump or use COPY.  Right?

That won't help you for schema; presumably you'd still do a pg_dump --schema-only | pg_restore for that.

Just like when restoring a --data-only dump or using COPY you'd have to disable FKs during sync, but that's pretty much unavoidable.
 
The way this initial export phase is implemented there is by providing a SQL-callable set returning function which is using SPI to run "SELECT * FROM mytable" behind the scenes and runs the resulting tuples through the INSERT callback of the logical decoding plugin, which lives in the same loadable module as this SQL function.

o_O

What about the reorder buffer, the logical decoding memory context, etc?
 
Bottled Water logical decoding plugin uses binary protocol based on Avro data serialization library.  As an experiment I was adding support for JSON output format to it, and for that I had to re-implement the aforementioned SRF to export initial data to convert tuples to JSON instead.

Have you taken a look at what's been done with pglogical and pglogical_output? 

We've got extensible protocol support there, and if Avro offers compelling benefits over the current binary serialization I'm certainly interested in hearing about it.
 
What do you say?

Interesting idea. As outlined I think it sounds pretty fragile though; I really, really don't like the idea of lying to the insert callback by passing it a fake insert with (presumably) fake reorder buffer txn, etc.

What we've done in pglogical is take a --schema-only dump then, on the downstream, attach to the exported snapshot and use COPY ... TO STDOUT over a libpq connection to the upstream feed that to COPY ... FROM STDIN on another libpq connection to "ourselves", i.e. the downstream. Unless Petr changed it to use COPY innards directly on the downstream; I know he talked about it but haven't checked if he did. Anyway, either way it's not pretty and requires a sideband non-replication connection to sync initial state. The upside is that it can be relatively easily parallelized for faster sync using multiple connections.

To what extent are you setting up a true logical decoding context here? Where does the xact info come from? The commit record? etc. You're presumably not forming a reorder buffer then decoding it since it could create a massive tempfile on disk, so are you just dummying this info up? Or hoping the plugin won't look at it?

The functionality is good and I think that for the SQL level you'd have to use SET TRANSACTION SNAPSHOT as you show. But I think it should really be usable from the replication protocol too - and should try to keep the state as close to that of a normal decoding session as possible. We'd at least need a new walsender protocol command equivalent that took the snapshot identifier, relation info and the other decoding params instead of a slot name. Or, ideally, a variant on START_REPLICATION ... LOGICAL ... that omits SLOT and instead takes TABLES as an argument, with a list of relation(s) to sync. Unlike normal START_REPLICATION ... LOGICAL ... it'd return to walsender protocol mode on completion, like the phys rep protocol does when it's time for a timeline switch.

Rather than lie to the insert callback I'd really rather define a new logical decoding callback for copying initial records. It doesn't get any xact info (since it's not useful/relevant) or a full reorder buffer. No ReorderBufferChange is passed; instead we pass something like a ReorderBufferSync that contains the new tuple ReorderBufferTupleBuf, origin id, origin lsn and commit timestamp (if known) and the RelFileNode affected. The LogicalDecodingContext that's set up for the callback gets ctx->reorder = NULL .  There's no ReorderBufferTxn argument and none is defined.

Since it's a new callback the plugin knows the rules, knows it's getting initial state data to sync over, etc. It doesn't have to try to guess if it's seeing a real insert and act differently with respect to xact identity etc.

Obviously that's 9.6 material at the soonest, and only 9.6 if it could be done ... well, right about now. So that won't meet your immediate needs, but I think the same is true of the interface you propose above.

What I suggest doing in the mean time is specifying a new callback function interface for tuple copies as described above, to be implemented by logical decoding modules that support this extension. In each decoding plugin we then define a SQL-callable function with 'internal' return type that returns a pointer to the callback so you can obtain the hook function address via a fmgr call via pg_proc. The callback would expect a state much like I describe above and we'd use a SQL-callable function like what you outlined to set up a fake logical decoding state for it, complete with decoding context etc. Probably copying & pasting a moderately painful amount of the logical decoding guts into an ext in the process :( since I don't think you can easily set up much of the decoding state using the decoding backend code without having a slot to use. Still, that'd let us prototype this and prove the idea for inclusion in 9.7 (?) in-core while retaining the capability via an extension for earlier versions.

You'd have to do much of the same hoop jumping to call an arbitrary output plugin's insert callback directly, if not more.

Alternately, you could just use COPY ;)

--
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

Re: Stream consistent snapshot via a logical decoding plugin as a series of INSERTs

From
"Shulgin, Oleksandr"
Date:
On Wed, Jan 20, 2016 at 7:57 AM, Craig Ringer <craig@2ndquadrant.com> wrote:
On 15 January 2016 at 16:30, Shulgin, Oleksandr <oleksandr.shulgin@zalando.de> wrote:
 
I'd like to propose generic functions (probably in an extension, or in core if not possible otherwise) to facilitate streaming existing data from the database *in the same format* that one would get if these would be the changes decoded by a logical decoding plugin.

So effectively produce synthetic logical decoding callbacks to run a bunch of fake INSERTs, presumably with a fake xid etc?

Exactly.
 
The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT command of the replication protocol to get a consistent snapshot of the database, then start listening to new changes on the slot.

My impression is that you want to avoid the current step of "synchronize database initial contents" when using logical decoding for replication.

Yes, but...
 
But I guess you're looking to then populate that empty schema in-band via logical decoding, rather than having to do a --data-only dump or use COPY.  Right?

That won't help you for schema; presumably you'd still do a pg_dump --schema-only | pg_restore for that.

Just like when restoring a --data-only dump or using COPY you'd have to disable FKs during sync, but that's pretty much unavoidable.

All of this implies another *postgres* database on the receiving side, which is not necessarily the case for my research.

The way this initial export phase is implemented there is by providing a SQL-callable set returning function which is using SPI to run "SELECT * FROM mytable" behind the scenes and runs the resulting tuples through the INSERT callback of the logical decoding plugin, which lives in the same loadable module as this SQL function.

o_O

What about the reorder buffer, the logical decoding memory context, etc?

As shown by the POC patch, it is rather straightforward to achieve.

Bottled Water logical decoding plugin uses binary protocol based on Avro data serialization library.  As an experiment I was adding support for JSON output format to it, and for that I had to re-implement the aforementioned SRF to export initial data to convert tuples to JSON instead.

Have you taken a look at what's been done with pglogical and pglogical_output? 

We've got extensible protocol support there, and if Avro offers compelling benefits over the current binary serialization I'm certainly interested in hearing about it.

This is what I'm going to benchmark.  With the generic function I can just create two slots: one for pglogical and another one for BottledWater/Avro and see which one performs better when forced to stream some TB worth of INSERTs through the change callback.

What do you say?

Interesting idea. As outlined I think it sounds pretty fragile though; I really, really don't like the idea of lying to the insert callback by passing it a fake insert with (presumably) fake reorder buffer txn, etc.

Fair enough.  However for performance testing it could be not that bad, even if nothing of that lands in the actual API.

What we've done in pglogical is take a --schema-only dump then, on the downstream, attach to the exported snapshot and use COPY ... TO STDOUT over a libpq connection to the upstream feed that to COPY ... FROM STDIN on another libpq connection to "ourselves", i.e. the downstream. Unless Petr changed it to use COPY innards directly on the downstream; I know he talked about it but haven't checked if he did. Anyway, either way it's not pretty and requires a sideband non-replication connection to sync initial state. The upside is that it can be relatively easily parallelized for faster sync using multiple connections.

I've also measured that to have a baseline for comparing it to decoding performance.

To what extent are you setting up a true logical decoding context here?

It is done in the same way exact pg_logical_slot_get/peek_changes() do.
 
Where does the xact info come from? The commit record? etc.

palloc0()
 
You're presumably not forming a reorder buffer then decoding it since it could create a massive tempfile on disk, so are you just dummying this info up?

In my experience, it doesn't.  We know it's going to be a "committed xact", so we don't really need to queue the changes up before we see a "commit" record.
 
Or hoping the plugin won't look at it?

Pretty much. :-)

The functionality is good and I think that for the SQL level you'd have to use SET TRANSACTION SNAPSHOT as you show. But I think it should really be usable from the replication protocol too - and should try to keep the state as close to that of a normal decoding session as possible. We'd at least need a new walsender protocol command equivalent that took the snapshot identifier, relation info and the other decoding params instead of a slot name. Or, ideally, a variant on START_REPLICATION ... LOGICAL ... that omits SLOT and instead takes TABLES as an argument, with a list of relation(s) to sync. Unlike normal START_REPLICATION ... LOGICAL ... it'd return to walsender protocol mode on completion, like the phys rep protocol does when it's time for a timeline switch.

I've had similar thoughts.

Another consideration is that we might introduce modes for acquiring the slot: Exclusive and Shared access (can be implemented with LWLocks?), so that peek_changes() and stream_relation() could acquire the slot in Shared access mode, thus allowing parallel queries, while START_REPLICATION and get_changes() would require Exclusive access.

Rather than lie to the insert callback I'd really rather define a new logical decoding callback for copying initial records. It doesn't get any xact info (since it's not useful/relevant) or a full reorder buffer. No ReorderBufferChange is passed; instead we pass something like a ReorderBufferSync that contains the new tuple ReorderBufferTupleBuf, origin id, origin lsn and commit timestamp (if known) and the RelFileNode affected. The LogicalDecodingContext that's set up for the callback gets ctx->reorder = NULL .  There's no ReorderBufferTxn argument and none is defined.

Since it's a new callback the plugin knows the rules, knows it's getting initial state data to sync over, etc. It doesn't have to try to guess if it's seeing a real insert and act differently with respect to xact identity etc.

Obviously that's 9.6 material at the soonest, and only 9.6 if it could be done ... well, right about now. So that won't meet your immediate needs, but I think the same is true of the interface you propose above.

That can be a good approach going forward, yes.

What I suggest doing in the mean time is specifying a new callback function interface for tuple copies as described above, to be implemented by logical decoding modules that support this extension. In each decoding plugin we then define a SQL-callable function with 'internal' return type that returns a pointer to the callback so you can obtain the hook function address via a fmgr call via pg_proc. The callback would expect a state much like I describe above and we'd use a SQL-callable function like what you outlined to set up a fake logical decoding state for it, complete with decoding context etc. Probably copying & pasting a moderately painful amount of the logical decoding guts into an ext in the process :( since I don't think you can easily set up much of the decoding state using the decoding backend code without having a slot to use. Still, that'd let us prototype this and prove the idea for inclusion in 9.7 (?) in-core while retaining the capability via an extension for earlier versions.

You'd have to do much of the same hoop jumping to call an arbitrary output plugin's insert callback directly, if not more.

Alternately, you could just use COPY ;)

Thanks for the thoughtful reply!  I'm going to experiment with my toy code a bit more, while keeping in mind what could a more workable approach look like.

--
Alex

On 20 January 2016 at 15:50, Shulgin, Oleksandr <oleksandr.shulgin@zalando.de> wrote:
 
All of this implies another *postgres* database on the receiving side, which is not necessarily the case for my research.

Good point. It might not be a DB at all, either, i.e. it might not understand INSERTs and you may want data in some arbitrary format. Like json.

The same is true for other intended uses of pglogical_output. It's a pain to have to query the desired initial state out of the database via normal libpq, so I like your idea to let the output plugin deal with that.

 
This is what I'm going to benchmark.  With the generic function I can just create two slots: one for pglogical and another one for BottledWater/Avro and see which one performs better when forced to stream some TB worth of INSERTs through the change callback.

Makes sense.
 
What do you say?

Interesting idea. As outlined I think it sounds pretty fragile though; I really, really don't like the idea of lying to the insert callback by passing it a fake insert with (presumably) fake reorder buffer txn, etc.

Fair enough.  However for performance testing it could be not that bad, even if nothing of that lands in the actual API.

I agree. It's fine for performance testing.
 
It should be relatively simple to add to pglogical_output, though you might have to hack out a few things if the faked-up state doesn't fully stand up to scrutiny.


You're presumably not forming a reorder buffer then decoding it since it could create a massive tempfile on disk, so are you just dummying this info up?

In my experience, it doesn't.  We know it's going to be a "committed xact", so we don't really need to queue the changes up before we see a "commit" record.

OK.

That's probably going to confuse pglogical_output a bit because it looks at the tx start/end records. But it might not look closely enough to care, to be honest, and may cope OK with the bogus data. It can probably be hacked around for testing purposes.
 
 
Another consideration is that we might introduce modes for acquiring the slot: Exclusive and Shared access (can be implemented with LWLocks?), so that peek_changes() and stream_relation() could acquire the slot in Shared access mode, thus allowing parallel queries, while START_REPLICATION and get_changes() would require Exclusive access.

That'd be nice, but probably not totally necessary for streaming relations. It doesn't really need the slot at all. Or shouldn't, I think. Though it might be easiest to allow it to acquire the slot just for convenience and shared code.
 

Thanks for the thoughtful reply!  I'm going to experiment with my toy code a bit more, while keeping in mind what could a more workable approach look like.

Great.

I'll be really interested in your results.

If you have trouble making pglogical_output cooperate with your tests and measurements feel free to mail me directly and I'll see if I can help find what's going wrong in the test harness or in pglogical_output . 

--
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

Re: Stream consistent snapshot via a logical decoding plugin as a series of INSERTs

From
"Shulgin, Oleksandr"
Date:
On Wed, Jan 20, 2016 at 9:26 AM, Craig Ringer <craig@2ndquadrant.com> wrote:
On 20 January 2016 at 15:50, Shulgin, Oleksandr <oleksandr.shulgin@zalando.de> wrote:

That'd be nice, but probably not totally necessary for streaming relations. It doesn't really need the slot at all. Or shouldn't, I think. Though it might be easiest to allow it to acquire the slot just for convenience and shared code.

Yes, before looking at the code I thought I could do without a slot, but dependency on MyReplicationSlot being set in a number of places has forced me to actually acquire it, in order to keep the changes more contained.

--
Alex