From 5c34a71a1e4eac1a31fd39ae2c1659b09281f978 Mon Sep 17 00:00:00 2001 From: wangw Date: Wed, 26 Jan 2022 10:19:18 +0800 Subject: [PATCH] Fix the timeout of subscriber in long transactions. --- src/backend/replication/logical/logical.c | 4 +-- src/backend/replication/pgoutput/pgoutput.c | 21 ++++++++++---- src/backend/replication/walsender.c | 31 +++++++++++++++++---- src/include/replication/logical.h | 3 +- src/include/replication/output_plugin.h | 2 +- 5 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 9bc3a2d8de..89e745809b 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -672,12 +672,12 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) * Update progress tracking (if supported). */ void -OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx) +OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive) { if (!ctx->update_progress) return; - ctx->update_progress(ctx, ctx->write_location, ctx->write_xid); + ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, send_keep_alive); } /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index af8d51aee9..3a8f6c27ad 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -419,7 +419,7 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); @@ -450,7 +450,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -464,7 +464,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -480,7 +480,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -654,15 +654,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) + { + OutputPluginUpdateProgress(ctx, true); return; + } break; case REORDER_BUFFER_CHANGE_UPDATE: if (!relentry->pubactions.pubupdate) + { + OutputPluginUpdateProgress(ctx, true); return; + } break; case REORDER_BUFFER_CHANGE_DELETE: if (!relentry->pubactions.pubdelete) + { + OutputPluginUpdateProgress(ctx, true); return; + } break; default: Assert(false); @@ -1011,7 +1020,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1032,7 +1041,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 655760fee3..69fd172a7c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -248,7 +248,7 @@ static long WalSndComputeSleeptime(TimestampTz now); static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); -static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); +static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); @@ -1446,24 +1446,45 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, * LogicalDecodingContext 'update_progress' callback. * * Write the current position to the lag tracker (see XLogSendPhysical). + * If send_keep_alive is true, send keepalive message to standby. */ static void -WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) +WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive) { - static TimestampTz sendTime = 0; + static TimestampTz trackTime = 0; TimestampTz now = GetCurrentTimestamp(); + if (send_keep_alive) + { + /* + * If half of wal_sender_timeout has lapsed without send message standby, + * send a keep-alive message to the standby. + */ + static TimestampTz sendTime = 0; + TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime, + wal_sender_timeout / 2); + if (now >= ping_time) + { + WalSndKeepalive(false); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + sendTime = now; + } + } + /* * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to * avoid flooding the lag tracker when we commit frequently. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 - if (!TimestampDifferenceExceeds(sendTime, now, + if (!TimestampDifferenceExceeds(trackTime, now, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) return; LagTrackerWrite(lsn, now); - sendTime = now; + trackTime = now; } /* diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 1097cc9799..ae16068f52 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr, XLogRecPtr Ptr, - TransactionId xid + TransactionId xid, + bool send_keep_alive ); typedef struct LogicalDecodingContext diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 41157fda7c..c29c4b13d3 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -243,6 +243,6 @@ typedef struct OutputPluginCallbacks /* Functions in replication/logical/logical.c */ extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write); extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write); -extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx); +extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive); #endif /* OUTPUT_PLUGIN_H */ -- 2.18.4