Re: Proposal: Generic WAL logical messages - Mailing list pgsql-hackers
From | Petr Jelinek |
---|---|
Subject | Re: Proposal: Generic WAL logical messages |
Date | |
Msg-id | 56F2454A.1050803@2ndquadrant.com Whole thread Raw |
In response to | Re: Proposal: Generic WAL logical messages (Andres Freund <andres@anarazel.de>) |
Responses |
Re: Proposal: Generic WAL logical messages
|
List | pgsql-hackers |
On 22/03/16 14:11, Andres Freund wrote: > On 2016-03-22 14:03:06 +0100, Petr Jelinek wrote: >> On 22/03/16 12:47, Andres Freund wrote: >>> On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote: >>> >>>> + >>>> + <sect3 id="logicaldecoding-output-plugin-message"> >>>> + <title>Generic Message Callback</title> >>>> + >>>> + <para> >>>> + The optional <function>message_cb</function> callback is called whenever >>>> + a logical decoding message has been decoded. >>>> +<programlisting> >>>> +typedef void (*LogicalDecodeMessageCB) ( >>>> + struct LogicalDecodingContext *, >>>> + ReorderBufferTXN *txn, >>>> + XLogRecPtr message_lsn, >>>> + const char *prefix, >>>> + Size message_size, >>>> + const char *message >>>> +); >>> >>> I see you removed the transactional parameter. I'm doubtful that that's >>> a good idea: It seems like it'd be rather helpful to pass the >>> transaction for a nontransaction message that's emitted while an xid was >>> assigned? >>> >> >> Hmm but won't that give the output plugin even transactions that were later >> aborted? That seems quite different behavior from how the txn parameter >> works everywhere else. > > Seems harmless to me if called out. > All right, after some consideration I agree. > >>> >>>> +/* >>>> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). >>>> + */ >>>> +static void >>>> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) >>>> +{ >>>> + SnapBuild *builder = ctx->snapshot_builder; >>>> + XLogReaderState *r = buf->record; >>>> + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; >>>> + xl_logical_message *message; >>>> + >>>> + if (info != XLOG_LOGICAL_MESSAGE) >>>> + elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info); >>>> + >>>> + message = (xl_logical_message *) XLogRecGetData(r); >>>> + >>>> + if (message->transactional) >>>> + { >>>> + if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr)) >>>> + return; >>>> + >>>> + ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r), >>>> + buf->endptr, >>>> + message->message, /* first part of message is prefix */ >>>> + message->message_size, >>>> + message->message + message->prefix_size); >>>> + } >>>> + else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT && >>>> + !SnapBuildXactNeedsSkip(builder, buf->origptr)) >>>> + { >>>> + volatile Snapshot snapshot_now; >>>> + ReorderBuffer *rb = ctx->reorder; >>>> + >>>> + /* setup snapshot to allow catalog access */ >>>> + snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r)); >>>> + SetupHistoricSnapshot(snapshot_now, NULL); >>>> + rb->message(rb, NULL, buf->origptr, message->message, >>>> + message->message_size, >>>> + message->message + message->prefix_size); >>>> + TeardownHistoricSnapshot(false); >>>> + } >>>> +} >>> >>> A number of things: >>> 1) The SnapBuildProcessChange needs to be toplevel, not just for >>> transactional messages - we can't yet necessarily build a snapshot. >> >> Nope, the snapshot state is checked in the else if. >> >>> 2) I'm inclined to move even the non-transactional stuff to reorderbuffer. >> >> Well, it's not doing anything with reorderbuffer but sure it can be done >> (didn't do it in the attached though). > > I think there'll be some interactions if we ever do some parts in > parallel and such. I'd rather keep decode.c to only do the lowest level > stuff. > Did it that way but I do like the resulting code less. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
Attachment
pgsql-hackers by date: