Re: Proposal: Generic WAL logical messages - Mailing list pgsql-hackers

From Andres Freund
Subject Re: Proposal: Generic WAL logical messages
Date
Msg-id 20160322114701.GE3790@awork2.anarazel.de
Whole thread Raw
In response to Re: Proposal: Generic WAL logical messages  (Petr Jelinek <petr@2ndquadrant.com>)
Responses Re: Proposal: Generic WAL logical messages
List pgsql-hackers
On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
> Just noticed there is missing symlink in the pg_xlogdump.

>  create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
>  create mode 120000 src/bin/pg_xlogdump/logicalmsgdesc.c

Uh, src/bin/pg_xlogdump/logicalmsgdesc.c shouldn't be there. The symlink
is supposed to be automatically created by the Makefile.

Were you perhaps confused because it showed up in git status? If so,
that's probably because it isn't in
src/bin/pg_xlogdump/.gitignore. Perhaps we should change that file to
ignore *desc.c?

> +      <row>
> +       <entry id="pg-logical-emit-message-text">
> +        <indexterm>
> +         <primary>pg_logical_emit_message</primary>
> +        </indexterm>
> +        <literal><function>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>,
<parameter>prefix</parameter><type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
 
> +       </entry>
> +       <entry>
> +        void
> +       </entry>
> +       <entry>
> +        Write text logical decoding message. This can be used to pass generic
> +        messages to logical decoding plugins through WAL. The parameter
> +        <parameter>transactional</parameter> specifies if the message should
> +        be part of current transaction or if it should be written immediately
> +        and decoded as soon as the logical decoding reads the record. The
> +        <parameter>prefix</parameter> is textual prefix used by the logical
> +        decoding plugins to easily recognize interesting messages for them.
> +        The <parameter>content</parameter> is the text of the message.
> +       </entry>
> +      </row>

s/write/emit/?

> +
> +    <sect3 id="logicaldecoding-output-plugin-message">
> +     <title>Generic Message Callback</title>
> +
> +     <para>
> +      The optional <function>message_cb</function> callback is called whenever
> +      a logical decoding message has been decoded.
> +<programlisting>
> +typedef void (*LogicalDecodeMessageCB) (
> +    struct LogicalDecodingContext *,
> +    ReorderBufferTXN *txn,
> +    XLogRecPtr message_lsn,
> +    const char *prefix,
> +    Size message_size,
> +    const char *message
> +);

I see you removed the transactional parameter. I'm doubtful that that's
a good idea: It seems like it'd be rather helpful to pass the
transaction for a nontransaction message that's emitted while an xid was
assigned?


> +/*
> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
> + */
> +static void
> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> +{
> +    SnapBuild  *builder = ctx->snapshot_builder;
> +    XLogReaderState *r = buf->record;
> +    uint8        info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
> +    xl_logical_message *message;
> +
> +    if (info != XLOG_LOGICAL_MESSAGE)
> +        elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
> +
> +    message = (xl_logical_message *) XLogRecGetData(r);
> +
> +    if (message->transactional)
> +    {
> +        if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
> +            return;
> +
> +        ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
> +                                  buf->endptr,
> +                                  message->message, /* first part of message is prefix */
> +                                  message->message_size,
> +                                  message->message + message->prefix_size);
> +    }
> +    else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
> +             !SnapBuildXactNeedsSkip(builder, buf->origptr))
> +    {
> +        volatile Snapshot    snapshot_now;
> +        ReorderBuffer       *rb = ctx->reorder;
> +
> +        /* setup snapshot to allow catalog access */
> +        snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
> +        SetupHistoricSnapshot(snapshot_now, NULL);
> +        rb->message(rb, NULL, buf->origptr, message->message,
> +                    message->message_size,
> +                    message->message + message->prefix_size);
> +        TeardownHistoricSnapshot(false);
> +    }
> +}

A number of things:
1) The SnapBuildProcessChange needs to be toplevel, not just for  transactional messages - we can't yet necessarily
builda snapshot.
 
2) I'm inclined to move even the non-transactional stuff to reorderbuffer.
3) This lacks error handling, we surely don't want to error out while  still having the historic snapshot setup
4) Without 3) the volatile is bogus.
5) Misses a ReorderBufferProcessXid() call.

> + * Every message carries prefix to avoid conflicts between different decoding
> + * plugins. The prefix has to be registered before the message using that
> + * prefix can be written to XLOG. The prefix can be registered exactly once to
> + * avoid situation where multiple third party extensions try to use same
> + * prefix.

Outdated afaics?


> @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
>                  change->data.tp.oldtuple = NULL;
>              }
>              break;
> +        case REORDER_BUFFER_CHANGE_MESSAGE:
> +            if (change->data.msg.prefix != NULL)
> +                pfree(change->data.msg.prefix);
> +            change->data.msg.prefix = NULL;
> +            if (change->data.msg.message != NULL)
> +                pfree(change->data.msg.message);
> +            change->data.msg.message = NULL;
> +            break;

Hm, this will have some overhead, but I guess the messages won't be
super frequent, and usually not very large.

> +/*
> + * Queue message into a transaction so it can be processed upon commit.
> + */
> +void
> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
> +                          const char *prefix, Size msg_sz, const char *msg)
> +{
> +    ReorderBufferChange *change;
> +
> +    Assert(xid != InvalidTransactionId);
> +
> +    change = ReorderBufferGetChange(rb);
> +    change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> +    change->data.msg.prefix = pstrdup(prefix);
> +    change->data.msg.message_size = msg_sz;
> +    change->data.msg.message = palloc(msg_sz);
> +    memcpy(change->data.msg.message, msg, msg_sz);
> +
> +    ReorderBufferQueueChange(rb, xid, lsn, change);
> +}

I'm not sure right now if there's any guarantee that the current memory
context is meaningful here? IIRC other long-lived allocations explicitly
use a context?

> +        case REORDER_BUFFER_CHANGE_MESSAGE:
> +            {
> +                char       *data;
> +                size_t        prefix_size = strlen(change->data.msg.prefix) + 1;
> +
> +                sz += prefix_size + change->data.msg.message_size;
> +                ReorderBufferSerializeReserve(rb, sz);
> +
> +                data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
> +                memcpy(data, change->data.msg.prefix,
> +                       prefix_size);
> +                memcpy(data + prefix_size, change->data.msg.message,
> +                       change->data.msg.message_size);
> +                break;
> +            }

Can you please include the sizes of the blocks explicitly, rather than
relying on 0 termination?


> @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
>  PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
>  PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
>  PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
> +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo,
>  logicalmsg_desc, logicalmsg_identify, NULL, NULL)

Did you consider doing this via the standby rmgr instead?

> +typedef struct xl_logical_message
> +{
> +    bool        transactional;                    /* is message transactional? */
> +    size_t        prefix_size;                    /* length of prefix */
> +    size_t        message_size;                    /* size of the message */
> +    char        message[FLEXIBLE_ARRAY_MEMBER];    /* message including the null
> +                                                 * terminated prefx of length
> +                                                 * prefix_size */
> +} xl_logical_message;
>

"prefx".

Greetings,

Andres Freund



pgsql-hackers by date:

Previous
From: Etsuro Fujita
Date:
Subject: Re: Odd system-column handling in postgres_fdw join pushdown patch
Next
From: Andres Freund
Date:
Subject: Re: Timeline following for logical slots