From 1969704830317946825ebd1b3c23f7fc80a3157d Mon Sep 17 00:00:00 2001 From: Shi Yu Date: Mon, 24 Oct 2022 15:52:14 +0800 Subject: [PATCH v1] Allow streaming every change without waiting till logical_decoding_work_mem. Add a new GUC force_stream_mode, when it is set on, send the change to output plugin immediately in streaming mode. Otherwise, send until logical_decoding_work_mem is exceeded. --- doc/src/sgml/config.sgml | 16 ++++++++++ .../replication/logical/reorderbuffer.c | 31 ++++++++++++++----- src/backend/utils/misc/guc_tables.c | 10 ++++++ src/include/replication/reorderbuffer.h | 1 + 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index ff6fcd902a..874d1fc6bc 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -11540,6 +11540,22 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1) + + force_stream_mode (boolean) + + force_stream_mode configuration parameter + + + + + Specifies wheather to force sending the changes to output plugin + immediately in streaming mode. If set to off (the + default), send until logical_decoding_work_mem is + exceeded. + + + + diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 31f7381f2d..848dd7e12f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -209,6 +209,12 @@ typedef struct ReorderBufferDiskChange int logical_decoding_work_mem; static const Size max_changes_in_memory = 4096; /* XXX for restore only */ +/* + * Wheather to send the change to output plugin immediately in streaming mode. + * When it is off, wait until logical_decoding_work_mem is exceeded. + */ +bool force_stream_mode; + /* --------------------------------------- * primary reorderbuffer support routines * --------------------------------------- @@ -3527,20 +3533,29 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) + /* + * Stream the changes immediately if force_stream_mode is on and the output + * plugin supports streaming. Otherwise wait until size exceeds + * logical_decoding_work_mem. + */ + bool force_stream = (force_stream_mode && ReorderBufferCanStream(rb)); + + /* bail out if force_stream is false and we haven't exceeded the memory limit */ + if (!force_stream && rb->size < logical_decoding_work_mem * 1024L) return; /* - * Loop until we reach under the memory limit. One might think that just - * by evicting the largest (sub)transaction we will come under the memory - * limit based on assumption that the selected transaction is at least as - * large as the most recent change (which caused us to go over the memory - * limit). However, that is not true because a user can reduce the + * If force_stream is true, loop until there's no change. Otherwise, loop + * until we reach under the memory limit. One might think that just by + * evicting the largest (sub)transaction we will come under the memory limit + * based on assumption that the selected transaction is at least as large as + * the most recent change (which caused us to go over the memory limit). + * However, that is not true because a user can reduce the * logical_decoding_work_mem to a smaller value before the most recent * change. */ - while (rb->size >= logical_decoding_work_mem * 1024L) + while ((!force_stream && rb->size >= logical_decoding_work_mem * 1024L) || + (force_stream && rb->size > 0)) { /* * Pick the largest transaction (or subtransaction) and evict it from diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 1bf14eec66..015a88abb3 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -1247,6 +1247,16 @@ struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"force_stream_mode", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Force sending the changes to output plugin immediately if streaming is supported, without waiting till logical_decoding_work_mem."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &force_stream_mode, + false, + NULL, NULL, NULL + }, { {"log_duration", PGC_SUSET, LOGGING_WHAT, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b23d8cc4f9..669dc3a40e 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -18,6 +18,7 @@ #include "utils/timestamp.h" extern PGDLLIMPORT int logical_decoding_work_mem; +extern PGDLLIMPORT bool force_stream_mode; /* an individual tuple, stored in one chunk of memory */ typedef struct ReorderBufferTupleBuf -- 2.24.0.windows.2