From 5a1525418743515c35bcda43e396de30e7d3d6be Mon Sep 17 00:00:00 2001 From: wangw Date: Wed, 18 Jan 2023 13:45:32 +0800 Subject: [PATCH v7] Fix the logical replication timeout during processing of DDL. When there is a DDL in a transaction that generates lots of temporary data due to rewrite rules, this temporary data will not be processed by the pgoutput plugin. This means it is possible for a timeout to occur if a sufficiently long time elapses since the last pgoutput message. A previous commit (f95d53e) fixed a similar scenario in this area, but that only fixed timeouts for DML going through pgoutput, so it did not address this DDL timeout case. To fix this, we introduced a new ReorderBuffer callback - 'ReorderBufferUpdateProgressTxnCB'. This callback is called to try to update the process whenever more than CHANGES_THRESHOLD changes are encountered while sending data of a transaction (and its subtransactions) to the output plugin. --- src/backend/replication/logical/logical.c | 72 +++++++++++++++++++ .../replication/logical/reorderbuffer.c | 4 ++ src/backend/replication/pgoutput/pgoutput.c | 54 ++------------ src/include/replication/reorderbuffer.h | 12 ++++ src/tools/pgindent/typedefs.list | 1 + 5 files changed, 95 insertions(+), 48 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1a58dd7649..ec6bc0fb28 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +/* update progress txn callback */ +static void update_progress_txn_cb_wrapper(ReorderBuffer *cache, + ReorderBufferTXN *txn, + XLogRecPtr lsn); + static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin); /* @@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper; + /* + * Callback to support updating progress during sending data of a + * transaction (and its subtransactions) to the output plugin. + */ + ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper; + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -1584,6 +1595,67 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +/* + * Update progress callback while processing a transaction. + * + * Try send a keepalive message during transaction processing. + * + * This is done because if we don't send any change to the downstream for a + * long time (exceeds the wal_receiver_timeout of standby), then it can + * timeout. This can happen for large DDL, or for large transactions when all + * or most of the changes are either not published or got filtered out. + */ +static void +update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + static int changes_count = 0; /* used to accumulate the number of + * changes */ + + Assert(!ctx->fast_forward); + + /* + * Trying to send keepalive message after every change has some + * overhead, but testing showed there is no noticeable overhead if + * we do it after every ~100 changes. + */ +#define CHANGES_THRESHOLD 100 + + if (++changes_count < CHANGES_THRESHOLD) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "update_progress_txn"; + state.report_location = lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + ctx->write_xid = txn->xid; + + /* + * Report this change's lsn so replies from clients can give an up-to-date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = lsn; + + ctx->end_xact = false; + + OutputPluginUpdateProgress(ctx, false); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 54ee824e6c..3760c8e3de 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -883,6 +883,8 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, { rb->message(rb, txn, lsn, false, prefix, message_size, message); + rb->update_progress_txn(rb, txn, lsn); + TeardownHistoricSnapshot(false); } PG_CATCH(); @@ -2446,6 +2448,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, elog(ERROR, "tuplecid value in changequeue"); break; } + + rb->update_progress_txn(rb, txn, change->lsn); } /* speculative insertion record must be freed by now */ diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 1a80d67bb9..0ba6b6b39c 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid, static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); -static void update_replication_progress(LogicalDecodingContext *ctx, - bool skipped_xact); /* * Only 3 publication actions are used for row filtering ("insert", "update", @@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * from this transaction has been sent to the downstream. */ sent_begin_txn = txndata->sent_begin_txn; - update_replication_progress(ctx, !sent_begin_txn); + OutputPluginUpdateProgress(ctx, !sent_begin_txn); pfree(txndata); txn->output_plugin_private = NULL; @@ -625,7 +623,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -639,7 +637,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; - update_replication_progress(ctx, false); - if (!is_publishable_relation(relation)) return; @@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - if (!data->messages) return; @@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); @@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, } } } - -/* - * Try to update progress and send a keepalive message if too many changes were - * processed. - * - * For a large transaction, if we don't send any change to the downstream for a - * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. - * This can happen when all or most of the changes are either not published or - * got filtered out. - */ -static void -update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact) -{ - static int changes_count = 0; - - /* - * We don't want to try sending a keepalive message after processing each - * change as that can have overhead. Tests revealed that there is no - * noticeable overhead in doing it after continuously processing 100 or so - * changes. - */ -#define CHANGES_THRESHOLD 100 - - /* - * If we are at the end of transaction LSN, update progress tracking. - * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we - * try to send a keepalive message if required. - */ - if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) - { - OutputPluginUpdateProgress(ctx, skipped_xact); - changes_count = 0; - } -} diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index f6c4dd75db..a971524090 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -525,6 +525,12 @@ typedef void (*ReorderBufferStreamTruncateCB) ( Relation relations[], ReorderBufferChange *change); +/* update progress txn callback signature */ +typedef void (*ReorderBufferUpdateProgressTxnCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr lsn); + struct ReorderBuffer { /* @@ -588,6 +594,12 @@ struct ReorderBuffer ReorderBufferStreamMessageCB stream_message; ReorderBufferStreamTruncateCB stream_truncate; + /* + * Callback to be called when updating progress during sending data of a + * transaction (and its subtransactions) to the output plugin. + */ + ReorderBufferUpdateProgressTxnCB update_progress_txn; + /* * Pointer that will be passed untouched to the callbacks. */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 51484ca7e2..9b6431b59f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2311,6 +2311,7 @@ ReorderBufferToastEnt ReorderBufferTupleBuf ReorderBufferTupleCidEnt ReorderBufferTupleCidKey +ReorderBufferUpdateProgressTxnCB ReorderTuple RepOriginId ReparameterizeForeignPathByChild_function -- 2.28.0.windows.1