From 19d39a500e14af67bd987a4e2d1826aa4b8efc46 Mon Sep 17 00:00:00 2001 From: wangw Date: Mon, 21 Mar 2022 14:04:10 +0800 Subject: [PATCH v4] Fix the timeout of subscriber in long transactions. We don't send keep-alive messages for a long time while processing large transactions during logical replication where we don't send any data of such transactions (say because the table modified in the transaction is not published) and then subscriber will timeout. So in this case, send keepalive message to the subscriber. --- src/backend/replication/logical/logical.c | 45 +++++++++++++++++++-- src/backend/replication/pgoutput/pgoutput.c | 38 ++++++++++++++--- src/backend/replication/walsender.c | 41 ++++++++++++++++--- src/include/replication/logical.h | 4 +- src/include/replication/output_plugin.h | 2 +- 5 files changed, 113 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 934aa13f2d..ae7ff14c95 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -680,15 +680,15 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) } /* - * Update progress tracking (if supported). + * Update progress tracking and try to send a keepalive message (if supported). */ void -OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx) +OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive) { if (!ctx->update_progress) return; - ctx->update_progress(ctx, ctx->write_location, ctx->write_xid); + ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, send_keep_alive); } /* @@ -1930,3 +1930,42 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->totalTxns = 0; rb->totalBytes = 0; } + +/* + * Try to send a keepalive message if too many changes were skipped. + * + * When we loop through changes in a transaction(see ReorderBufferProcessTXN), + * if no message is sent to standby for a long time during a large transaction, + * we should send a keepalive message to ensure that the standby will not + * timeout. + */ +void +UpdateProgress(LogicalDecodingContext *ctx, bool skipped) +{ + static int skipped_changes_count = 0; + + /* + * skipped_changes_count is reset when processing changes that do not + * need to be skipped. + */ + if (!skipped) + { + skipped_changes_count = 0; + return; + } + + /* + * After continuously skipping SKIPPED_CHANGES_THRESHOLD changes, try to send a + * keepalive message. + */ + #define SKIPPED_CHANGES_THRESHOLD 100 + + if (++skipped_changes_count >= SKIPPED_CHANGES_THRESHOLD) + { + /* Try to send a keepalive message. */ + OutputPluginUpdateProgress(ctx, true); + + /* After trying to send a keepalive message, reset the flag. */ + skipped_changes_count = 0; + } +} diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 5fddab3a3d..c9d931888d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -475,7 +475,7 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); @@ -506,7 +506,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -520,7 +520,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -536,7 +536,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1149,9 +1149,14 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChangeType action = change->action; TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; + bool change_sent = false; if (!is_publishable_relation(relation)) + { + /* Try to send a keepalive message. */ + UpdateProgress(ctx, true); return; + } /* * Remember the xid for the change in streaming mode. We need to send xid @@ -1169,15 +1174,27 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) + { + /* Try to send a keepalive message. */ + UpdateProgress(ctx, true); return; + } break; case REORDER_BUFFER_CHANGE_UPDATE: if (!relentry->pubactions.pubupdate) + { + /* Try to send a keepalive message. */ + UpdateProgress(ctx, true); return; + } break; case REORDER_BUFFER_CHANGE_DELETE: if (!relentry->pubactions.pubdelete) + { + /* Try to send a keepalive message. */ + UpdateProgress(ctx, true); return; + } break; default: Assert(false); @@ -1226,6 +1243,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, data->binary); OutputPluginWrite(ctx, true); + change_sent = true; break; case REORDER_BUFFER_CHANGE_UPDATE: if (change->data.tp.oldtuple) @@ -1293,6 +1311,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginWrite(ctx, true); + change_sent = true; break; case REORDER_BUFFER_CHANGE_DELETE: if (change->data.tp.oldtuple) @@ -1330,6 +1349,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, data->binary); OutputPluginWrite(ctx, true); + change_sent = true; } else elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); @@ -1338,6 +1358,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* + * Reset the counter for skipped changes if change_sent is true, otherwise try to + * send a keepalive message. + */ + UpdateProgress(ctx, !change_sent); + if (RelationIsValid(ancestor)) { RelationClose(ancestor); @@ -1598,7 +1624,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1619,7 +1645,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2d0292a092..82daaa06a2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -249,7 +249,7 @@ static long WalSndComputeSleeptime(TimestampTz now); static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); -static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); +static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); @@ -1446,25 +1446,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* * LogicalDecodingContext 'update_progress' callback. * * Write the current position to the lag tracker (see XLogSendPhysical). + * Try to send a keepalive message to standby if send_keep_alive is true. */ static void -WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) +WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive) { - static TimestampTz sendTime = 0; + static TimestampTz trackTime = 0; TimestampTz now = GetCurrentTimestamp(); + if (send_keep_alive) + { + /* + * If the standby does not receive any message from the primary for + * more than (wal_receiver_timeout / 2), the standby will send a + * message requesting a reply to the primary. If receive this message, + * reply immediately to avoid timeout. + */ + + if (now < TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2) && + !pq_is_send_pending()) + return; + + /* Check for input from the client. */ + ProcessRepliesIfAny(); + + /* die if timeout was reached */ + WalSndCheckTimeOut(); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + } + /* * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to * avoid flooding the lag tracker when we commit frequently. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 - if (!TimestampDifferenceExceeds(sendTime, now, + if (!TimestampDifferenceExceeds(trackTime, now, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) return; LagTrackerWrite(lsn, now); - sendTime = now; + trackTime = now; } /* diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 1097cc9799..1d8ab2a56b 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr, XLogRecPtr Ptr, - TransactionId xid + TransactionId xid, + bool send_keep_alive ); typedef struct LogicalDecodingContext @@ -140,5 +141,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); +extern void UpdateProgress(LogicalDecodingContext *ctx, bool skipped); #endif diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index a16bebf76c..ed802b58ef 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -270,6 +270,6 @@ typedef struct OutputPluginCallbacks /* Functions in replication/logical/logical.c */ extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write); extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write); -extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx); +extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive); #endif /* OUTPUT_PLUGIN_H */ -- 2.27.0