Re: Proposal: Generic WAL logical messages - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | Re: Proposal: Generic WAL logical messages |
Date | |
Msg-id | 20160227000509.2zms6difabot7lys@alap3.anarazel.de Whole thread Raw |
In response to | Re: Proposal: Generic WAL logical messages (Petr Jelinek <petr@2ndquadrant.com>) |
Responses |
Re: Proposal: Generic WAL logical messages
Re: Proposal: Generic WAL logical messages |
List | pgsql-hackers |
Hi, I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's not much documentation about what it actually is supposed to acomplish. Afaics you're basically forced to use shared_preload_libraries with it right now? Also, iterating through a linked list everytime something is logged doesn't seem very satisfying? On 2016-02-24 18:35:16 +0100, Petr Jelinek wrote: > +SET synchronous_commit = on; > +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); > + ?column? > +---------- > + init > +(1 row) > +SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1'); > + ?column? > +---------- > + msg1 > +(1 row) Hm. Somehow 'sending' a message seems wrong here. Maybe 'emit'? > + <row> > + <entry id="pg-logical-send-message-text"> > + <indexterm> > + <primary>pg_logical_send_message</primary> > + </indexterm> > + <literal><function>pg_logical_send_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter><type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal> > + </entry> > + <entry> > + void > + </entry> > + <entry> > + Write text logical decoding message. This can be used to pass generic > + messages to logical decoding plugins through WAL. The parameter > + <parameter>transactional</parameter> specifies if the message should > + be part of current transaction or if it should be written and decoded > + immediately. The <parameter>prefix</parameter> has to be prefix which > + was registered by a plugin. The <parameter>content</parameter> is > + content of the message. > + </entry> > + </row> It's not decoded immediately, even if emitted non-transactionally. > + <sect3 id="logicaldecoding-output-plugin-message"> > + <title>Generic Message Callback</title> > + > + <para> > + The optional <function>message_cb</function> callback is called whenever > + a logical decoding message has been decoded. > +<programlisting> > +typedef void (*LogicalDecodeMessageCB) ( > + struct LogicalDecodingContext *, > + ReorderBufferTXN *txn, > + XLogRecPtr message_lsn, > + bool transactional, > + const char *prefix, > + Size message_size, > + const char *message > +); We should at least document what txn is set to if not transactional. > +void > +logicalmsg_desc(StringInfo buf, XLogReaderState *record) > +{ > + char *rec = XLogRecGetData(record); > + xl_logical_message *xlrec = (xl_logical_message *) rec; > + > + appendStringInfo(buf, "%s message size %zu bytes", > + xlrec->transactional ? "transactional" : "nontransactional", > + xlrec->message_size); > +} Shouldn't we check uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; if XLogRecGetInfo(record)== XLOG_LOGICAL_MESSAGE here? > +const char * > +logicalmsg_identify(uint8 info) > +{ > + return NULL; > +} Huh? > +void > +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, > + bool transactional, const char *prefix, Size msg_sz, > + const char *msg) > +{ > + ReorderBufferTXN *txn = NULL; > + > + if (transactional) > + { > + ReorderBufferChange *change; > + > + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); > + > + Assert(xid != InvalidTransactionId); > + Assert(txn != NULL); > + > + change = ReorderBufferGetChange(rb); > + change->action = REORDER_BUFFER_CHANGE_MESSAGE; > + change->data.msg.transactional = true; > + change->data.msg.prefix = pstrdup(prefix); > + change->data.msg.message_size = msg_sz; > + change->data.msg.message = palloc(msg_sz); > + memcpy(change->data.msg.message, msg, msg_sz); > + > + ReorderBufferQueueChange(rb, xid, lsn, change); > + } > + else > + { > + rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg); > + } > +} This approach prohibts catalog access when processing a nontransaction message as there's no snapshot set up. > + case REORDER_BUFFER_CHANGE_MESSAGE: > + { > + char *data; > + size_t prefix_size = strlen(change->data.msg.prefix) + 1; > + > + sz += prefix_size + change->data.msg.message_size; > + ReorderBufferSerializeReserve(rb, sz); > + > + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); > + memcpy(data, change->data.msg.prefix, > + prefix_size); > + memcpy(data + prefix_size, change->data.msg.message, > + change->data.msg.message_size); > + break; > + } > case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: > { > Snapshot snap; > @@ -2354,6 +2415,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, > data += len; > } > break; > + case REORDER_BUFFER_CHANGE_MESSAGE: > + { > + Size message_size = change->data.msg.message_size; > + Size prefix_size = strlen(data) + 1; > + > + change->data.msg.prefix = pstrdup(data); > + change->data.msg.message = palloc(message_size); > + memcpy(change->data.msg.message, data + prefix_size, > + message_size); > + > + data += prefix_size + message_size; > + } Please add a test exercising these paths. Greetings, Andres Freund
pgsql-hackers by date: