Thread: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Dave Cramer
Date:
For logical replication there is no need to implement this, but others are using the pgoutput plugin for Change Data Capture. The reason they are using pgoutput is because it is guaranteed to be available as it is in core postgres. 

Implementing LogicalDecodeMessageCB provides some synchronization facility that is not easily replicated.

Thoughts ?

Dave Cramer

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Andres Freund
Date:
Hi,

On 2020-07-24 11:33:52 -0400, Dave Cramer wrote:
> For logical replication there is no need to implement this, but others are
> using the pgoutput plugin for Change Data Capture. The reason they are
> using pgoutput is because it is guaranteed to be available as it is in core
> postgres.
> 
> Implementing LogicalDecodeMessageCB provides some synchronization facility
> that is not easily replicated.

It's definitely useful. Probably needs to be parameter that signals
whether they should be sent out?

Greetings,

Andres Freund



Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
David Pirotte
Date:
On Wed, Jul 29, 2020 at 9:41 PM Dave Cramer <davecramer@gmail.com> wrote:
For logical replication there is no need to implement this, but others are using the pgoutput plugin for Change Data Capture. The reason they are using pgoutput is because it is guaranteed to be available as it is in core postgres. 

Implementing LogicalDecodeMessageCB provides some synchronization facility that is not easily replicated.

Thoughts ?

Attached is a draft patch that adds this functionality into the pgoutput plugin. A slot consumer can pass 'messages' as an option to include logical messages from pg_logical_emit_message in the replication flow.

FWIW, we have been using pg_logical_emit_message to send application-level events alongside our change-data-capture for about two years, and we would move this part of our stack to pgoutput if message support was available. 

Looking forward to discussion and feedback.

Cheers,
Dave
Attachment

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Cary Huang
Date:
The following review has been posted through the commitfest application:
make installcheck-world:  tested, passed
Implements feature:       tested, passed
Spec compliant:           tested, passed
Documentation:            tested, passed

Hi

I have tried the patch and it functions as described. The attached tap test case is comprehensive and is passing.
However,the patch does not apply well on the current master; I had to checkout to a much earlier commit to be able to
patchcorrectly. The patch will need to be rebased to the current master.
 

Thanks

Cary Huang
-------------
HighGo Software Inc. (Canada)
cary.huang@highgo.ca
www.highgo.ca

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Andres Freund
Date:
Hi,

On 2020-07-29 22:26:04 -0500, David Pirotte wrote:
> FWIW, we have been using pg_logical_emit_message to send application-level
> events alongside our change-data-capture for about two years, and we would
> move this part of our stack to pgoutput if message support was available.

Yea, it's really useful for this kind of thing.


> @@ -119,14 +124,16 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
>  
>  static void
>  parse_output_parameters(List *options, uint32 *protocol_version,
> -                        List **publication_names, bool *binary)
> +                        List **publication_names, bool *binary, bool *messages)

I think it might be time to add a PgOutputParameters struct, instead of
adding more and more output parameters to
parse_output_parameters. Alternatively just passing PGOutputData owuld
make sense.


> diff --git a/src/test/subscription/t/015_messages.pl b/src/test/subscription/t/015_messages.pl
> new file mode 100644
> index 0000000000..4709e69f4e
> --- /dev/null
> +++ b/src/test/subscription/t/015_messages.pl

A test verifying that a non-transactional message in later aborted
transaction is handled correctly would be good.

Greetings,

Andres Freund



Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Michael Paquier
Date:
On Tue, Sep 08, 2020 at 12:18:23PM -0700, Andres Freund wrote:
> A test verifying that a non-transactional message in later aborted
> transaction is handled correctly would be good.

On top of that, the patch needs a rebase as it visibly fails to apply,
per the CF bot.
--
Michael

Attachment

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Dave Cramer
Date:
David,

On Thu, 24 Sep 2020 at 00:22, Michael Paquier <michael@paquier.xyz> wrote:
On Tue, Sep 08, 2020 at 12:18:23PM -0700, Andres Freund wrote:
> A test verifying that a non-transactional message in later aborted
> transaction is handled correctly would be good.

On top of that, the patch needs a rebase as it visibly fails to apply,
per the CF bot.
--
Michael

Where are you with this? Are you able to work on it ?
Dave Cramer 

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
David Pirotte
Date:
On Tue, Nov 3, 2020 at 7:19 AM Dave Cramer <davecramer@gmail.com> wrote:
David,

On Thu, 24 Sep 2020 at 00:22, Michael Paquier <michael@paquier.xyz> wrote:
On Tue, Sep 08, 2020 at 12:18:23PM -0700, Andres Freund wrote:
> A test verifying that a non-transactional message in later aborted
> transaction is handled correctly would be good.

On top of that, the patch needs a rebase as it visibly fails to apply,
per the CF bot.
--
Michael

Where are you with this? Are you able to work on it ?
Dave Cramer 

Apologies for the delay, here.

I've attached v2 of this patch which applies cleanly to master. The patch also now includes a test demonstrating that pg_logical_emit_message correctly sends non-transactional messages when called inside a transaction that is rolled back. (Thank you, Andres, for this suggestion.) The only other change is that I added this new message type into the LogicalRepMsgType enum added earlier this week.

Let me know what you think.

Cheers,
Dave 
Attachment

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Ashutosh Bapat
Date:
On Thu, Nov 5, 2020 at 9:16 AM David Pirotte <dpirotte@gmail.com> wrote:
>
> On Tue, Nov 3, 2020 at 7:19 AM Dave Cramer <davecramer@gmail.com> wrote:
>>
>> David,
>>
>> On Thu, 24 Sep 2020 at 00:22, Michael Paquier <michael@paquier.xyz> wrote:
>>>
>>> On Tue, Sep 08, 2020 at 12:18:23PM -0700, Andres Freund wrote:
>>> > A test verifying that a non-transactional message in later aborted
>>> > transaction is handled correctly would be good.
>>>
>>> On top of that, the patch needs a rebase as it visibly fails to apply,
>>> per the CF bot.
>>> --
>>> Michael
>>
>>
>> Where are you with this? Are you able to work on it ?
>> Dave Cramer
>
>
> Apologies for the delay, here.
>
> I've attached v2 of this patch which applies cleanly to master. The patch also now includes a test demonstrating that
pg_logical_emit_messagecorrectly sends non-transactional messages when called inside a transaction that is rolled back.
(Thankyou, Andres, for this suggestion.) The only other change is that I added this new message type into the
LogicalRepMsgTypeenum added earlier this week. 
>
> Let me know what you think.

This feature looks useful. Here are some comments.

+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+                        bool transactional, const char *prefix, Size sz,
+                        const char *message)
+{
+   uint8       flags = 0;
+
+   pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+

Similar to the UPDATE/DELETE/INSERT records decoded when streaming is being
used, we need to add transaction id for transactional messages. May be we add
that even in case of non-streaming case and use it to decide whether it's a
transactional message or not. That might save us a byte when we are adding a
transaction id.

+   /* encode and send message flags */
+   if (transactional)
+       flags |= MESSAGE_TRANSACTIONAL;
+
+   pq_sendint8(out, flags);

Is 8 bits enough considering future improvements? What if we need to use more
than 8 bit flags?

@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
            apply_handle_origin(s);
            return;

+       case LOGICAL_REP_MSG_MESSAGE:

Should we add the logical message to the WAL downstream so that it flows
further down to a cascaded logical replica. Should that be controlled
by an option?

--
Best Wishes,
Ashutosh Bapat



Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
David Pirotte
Date:
On Fri, Nov 6, 2020 at 7:05 AM Ashutosh Bapat <ashutosh.bapat.oss@gmail.com> wrote:
+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+                        bool transactional, const char *prefix, Size sz,
+                        const char *message)
+{
+   uint8       flags = 0;
+
+   pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+

Similar to the UPDATE/DELETE/INSERT records decoded when streaming is being
used, we need to add transaction id for transactional messages. May be we add
that even in case of non-streaming case and use it to decide whether it's a
transactional message or not. That might save us a byte when we are adding a
transaction id.

My preference is to add in the xid when streaming is enabled. (1) It is a more consistent implementation with the other message types, and (2) it saves 3 bytes when streaming is disabled. I've attached an updated patch. It is not a strong preference, though, if you suggest a different approach.
 
+   /* encode and send message flags */
+   if (transactional)
+       flags |= MESSAGE_TRANSACTIONAL;
+
+   pq_sendint8(out, flags);

Is 8 bits enough considering future improvements? What if we need to use more
than 8 bit flags?

8 possible flags already sounds like a lot, here, so I suspect that a byte will be sufficient for the foreseeable future. If we needed to go beyond that, it'd be a protocol version bump. (Well, it might first warrant reflection as to why we had so many flags...)
 
@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
            apply_handle_origin(s);
            return;

+       case LOGICAL_REP_MSG_MESSAGE:

Should we add the logical message to the WAL downstream so that it flows
further down to a cascaded logical replica. Should that be controlled
by an option?

Hmm, I can't think of a use case for this, but perhaps someone could. Do you, or does anyone, have something in mind? I think we provide a lot of value with logical messages in pgoutput without supporting consumption from a downstream replica, so perhaps this is better considered separately.

If we want this, I think we would add a "messages" option on the subscription. If present, the subscriber will receive messages and pass them to any downstream subscribers. I started working on this and it does expand the change's footprint. As is, a developer would consume messages by connecting to a pgoutput slot on the message's origin. (e.g. via Debezium or a custom client) The subscription and logical worker infrastructure don't know about messages, but they would need to in order to support consuming an origin's messages on a downstream logical replica. In any case, I'll keep working on it so we can see what it looks like.

Cheers,
Dave
Attachment

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Euler Taveira
Date:
On Wed, 18 Nov 2020 at 03:04, David Pirotte <dpirotte@gmail.com> wrote:
On Fri, Nov 6, 2020 at 7:05 AM Ashutosh Bapat <ashutosh.bapat.oss@gmail.com> wrote:
+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+                        bool transactional, const char *prefix, Size sz,
+                        const char *message)
+{
+   uint8       flags = 0;
+
+   pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+

Similar to the UPDATE/DELETE/INSERT records decoded when streaming is being
used, we need to add transaction id for transactional messages. May be we add
that even in case of non-streaming case and use it to decide whether it's a
transactional message or not. That might save us a byte when we are adding a
transaction id.

I also reviewed your patch. This feature would be really useful for replication
scenarios. Supporting this feature means that you don't need to use a table to
pass messages from one node to another one. Here are a few comments/ideas.

@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
            apply_handle_origin(s);
            return;
 
+       case LOGICAL_REP_MSG_MESSAGE:
+           return;
+

I added a comment explaining that this message is not used by logical
replication but it could possibly be useful for other applications using
pgoutput. See patch 0003.

Andres mentioned in this thread [1] that we could simplify the
parse_output_parameters. I refactored this function to pass only PGOutputData
to it and also move enable_streaming to this struct. I use a similar approach
in wal2json; it is easier to get the options since it is available in the
logical decoding context. See patch 0004.
 
My preference is to add in the xid when streaming is enabled. (1) It is a more consistent implementation with the other message types, and (2) it saves 3 bytes when streaming is disabled. I've attached an updated patch. It is not a strong preference, though, if you suggest a different approach.
 
I agree with this approach. xid is available in the BEGIN message if the
MESSAGE is transactional. For non-transactional messages, xid is not available.
Your implementation is not consistent with the other pgoutput_XXX functions
that check in_streaming in the pgoutput_XXX and pass parameters to other
functions that require it. See patch 005.

The last patch 0006 overhauls your tests. I added/changed some comments,
replaced identifiers with uppercase letters, used 'pgoutput' as prefix, checked
the prefix, and avoided a checkpoint during the test. There are possibly other
improvements that I didn't mention here. Maybe you can use encode(substr(data,
1, 1), 'escape') instead of comparing the ASCII code (77).
 
Should we add the logical message to the WAL downstream so that it flows
further down to a cascaded logical replica. Should that be controlled
by an option?

Hmm, I can't think of a use case for this, but perhaps someone could. Do you, or does anyone, have something in mind? I think we provide a lot of value with logical messages in pgoutput without supporting consumption from a downstream replica, so perhaps this is better considered separately.

If we want this, I think we would add a "messages" option on the subscription. If present, the subscriber will receive messages and pass them to any downstream subscribers. I started working on this and it does expand the change's footprint. As is, a developer would consume messages by connecting to a pgoutput slot on the message's origin. (e.g. via Debezium or a custom client) The subscription and logical worker infrastructure don't know about messages, but they would need to in order to support consuming an origin's messages on a downstream logical replica. In any case, I'll keep working on it so we can see what it looks like.

The decision to send received messages to downstream nodes should be made by
the subscriber. If the subscriber wants to replicate messages to downstream
nodes, the worker should call LogLogicalMessage.

This does not belong to this patch but when/if this patch is committed, I will
submit a patch to filter messages by prefix. wal2json has a similar
(filter-msg-prefixes / add-msg-prefixes) feature and it is useful for cases
where you are handling multiple output plugins like wal2json and pgoutput. The
idea is to avoid sending useless messages to some node that (i) don't know how
to process it and (ii) has no interest in it.

PS> I'm attaching David's patches (0001 and 0002) again to keep cfbot happy.



--
Euler Taveira                 http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachment

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Amit Kapila
Date:
On Wed, Nov 25, 2020 at 8:58 AM Euler Taveira
<euler.taveira@2ndquadrant.com> wrote:
>
> On Wed, 18 Nov 2020 at 03:04, David Pirotte <dpirotte@gmail.com> wrote:
>>
>> On Fri, Nov 6, 2020 at 7:05 AM Ashutosh Bapat <ashutosh.bapat.oss@gmail.com> wrote:
>>>
>>> +/*
>>> + * Write MESSAGE to stream
>>> + */
>>> +void
>>> +logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
>>> +                        bool transactional, const char *prefix, Size sz,
>>> +                        const char *message)
>>> +{
>>> +   uint8       flags = 0;
>>> +
>>> +   pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
>>> +
>>>
>>> Similar to the UPDATE/DELETE/INSERT records decoded when streaming is being
>>> used, we need to add transaction id for transactional messages. May be we add
>>> that even in case of non-streaming case and use it to decide whether it's a
>>> transactional message or not. That might save us a byte when we are adding a
>>> transaction id.
>>
>>
> I also reviewed your patch. This feature would be really useful for replication
> scenarios. Supporting this feature means that you don't need to use a table to
> pass messages from one node to another one. Here are a few comments/ideas.
>

Your ideas/suggestions look good to me. Don't we need to provide a
read function corresponding to logicalrep_write_message? We have it
for other write functions. Can you please combine all of your changes
into one patch?

-- 
With Regards,
Amit Kapila.



Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
"Euler Taveira"
Date:
On Thu, Apr 1, 2021, at 7:19 AM, Amit Kapila wrote:
Your ideas/suggestions look good to me. Don't we need to provide a
read function corresponding to logicalrep_write_message? We have it
for other write functions. Can you please combine all of your changes
into one patch?
Thanks for taking a look at this patch. I didn't consider a
logicalrep_read_message function because the protocol doesn't support it yet.

/*
* Logical replication does not use generic logical messages yet.
* Although, it could be used by other applications that use this
* output plugin.
*/

Someone that is inspecting the code in the future could possibly check this
discussion to understand why this function isn't available.

This new patch set version has 2 patches that is because there are 2 separate
changes: parse_output_parameters() refactor and logical decoding message
support.


--
Euler Taveira

Attachment

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Amit Kapila
Date:
On Sat, Apr 3, 2021 at 5:26 AM Euler Taveira <euler@eulerto.com> wrote:
>
> On Thu, Apr 1, 2021, at 7:19 AM, Amit Kapila wrote:
>
> This new patch set version has 2 patches that is because there are 2 separate
> changes: parse_output_parameters() refactor and logical decoding message
> support.
>

I have made few minor changes in the attached. (a) Initialize the
streaming message callback API, (b) update docs to reflect that XID
can be sent for streaming of in-progress transactions, I see that the
same information needs to be updated for a few other protocol message
but we can do that as a separate patch (c) slightly tweaked the commit
messages

Let me know what you think? I am planning to push this tomorrow unless
you or someone else has any comments.

-- 
With Regards,
Amit Kapila.

Attachment

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
"Euler Taveira"
Date:
On Mon, Apr 5, 2021, at 4:06 AM, Amit Kapila wrote:
I have made few minor changes in the attached. (a) Initialize the
streaming message callback API, (b) update docs to reflect that XID
can be sent for streaming of in-progress transactions, I see that the
same information needs to be updated for a few other protocol message
but we can do that as a separate patch (c) slightly tweaked the commit
messages
Good catch. I completely forgot the streaming of in progress transactions. I
agree that the documentation for transaction should be added as a separate
patch since the scope is beyond this feature.


--
Euler Taveira

Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
Amit Kapila
Date:
On Mon, Apr 5, 2021 at 5:45 PM Euler Taveira <euler@eulerto.com> wrote:
>
> On Mon, Apr 5, 2021, at 4:06 AM, Amit Kapila wrote:
>
> I have made few minor changes in the attached. (a) Initialize the
> streaming message callback API, (b) update docs to reflect that XID
> can be sent for streaming of in-progress transactions, I see that the
> same information needs to be updated for a few other protocol message
> but we can do that as a separate patch (c) slightly tweaked the commit
> messages
>
> Good catch. I completely forgot the streaming of in progress transactions. I
> agree that the documentation for transaction should be added as a separate
> patch since the scope is beyond this feature.
>

I have pushed this work and updated the CF entry accordingly.

-- 
With Regards,
Amit Kapila.



Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From
"Euler Taveira"
Date:
On Wed, Apr 7, 2021, at 2:20 AM, Amit Kapila wrote:
I have pushed this work and updated the CF entry accordingly.
Great. Thank you.


--
Euler Taveira