From ed264cf219d5b879fcaae942c8bc9321cca9e6c0 Mon Sep 17 00:00:00 2001 From: Shi Yu Date: Mon, 24 Oct 2022 15:52:14 +0800 Subject: [PATCH] Allow streaming every change instead of waiting till logical_decoding_work_mem. Add a new GUC force_parallel_stream_mode, when it is set on, send the change to output plugin immediately in streaming mode. Otherwise, send until logical_decoding_work_mem is exceeded. Currently, it is for testing parallel apply patch. --- doc/src/sgml/config.sgml | 14 +++++++++ .../replication/logical/reorderbuffer.c | 31 ++++++++++++++----- src/backend/utils/misc/guc_tables.c | 9 ++++++ src/backend/utils/misc/postgresql.conf.sample | 3 ++ src/include/replication/reorderbuffer.h | 1 + 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a6aa1acc20..4d3b23b764 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4342,6 +4342,20 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + force_parallel_stream_mode (boolean) + + force_parallel_stream_mode configuration parameter + + + + + Specifies wheather to force sending the change to output plugin immediately in streaming mode. + If set to off (the default), send until logical_decoding_work_mem is exceeded. + + + + diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 696a96ff0d..62bfd3b70a 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -209,6 +209,12 @@ typedef struct ReorderBufferDiskChange int logical_decoding_work_mem; static const Size max_changes_in_memory = 4096; /* XXX for restore only */ +/* + * Wheather to send the change to output plugin immediately in streaming mode. + * When it is off, wait until logical_decoding_work_mem is exceeded. + */ +bool force_parallel_stream_mode; + /* --------------------------------------- * primary reorderbuffer support routines * --------------------------------------- @@ -3542,20 +3548,29 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) + /* + * Stream the changes immediately if force_parallel_stream_mode is on and + * the output plugin supports streaming. Otherwise wait until size exceeds + * logical_decoding_work_mem. + */ + bool force_stream = force_parallel_stream_mode && ReorderBufferCanStream(rb); + + /* bail out if force_stream is false and we haven't exceeded the memory limit */ + if (!force_stream && rb->size < logical_decoding_work_mem * 1024L) return; /* - * Loop until we reach under the memory limit. One might think that just - * by evicting the largest (sub)transaction we will come under the memory - * limit based on assumption that the selected transaction is at least as - * large as the most recent change (which caused us to go over the memory - * limit). However, that is not true because a user can reduce the + * If force_stream is true, loop until there's no change. Otherwise, loop + * until we reach under the memory limit. One might think that just by + * evicting the largest (sub)transaction we will come under the memory limit + * based on assumption that the selected transaction is at least as large as + * the most recent change (which caused us to go over the memory limit). + * However, that is not true because a user can reduce the * logical_decoding_work_mem to a smaller value before the most recent * change. */ - while (rb->size >= logical_decoding_work_mem * 1024L) + while ((!force_stream && rb->size >= logical_decoding_work_mem * 1024L) || + (force_stream && rb->size > 0)) { /* * Pick the largest transaction (or subtransaction) and evict it from diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f3de4ff7db..bfca9a8c07 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -1007,6 +1007,15 @@ struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"force_parallel_stream_mode", PGC_USERSET, REPLICATION_SENDING, + gettext_noop("Force sending the change to output plugin immediately if streaming is supported, install of waiting till logical_decoding_work_mem."), + NULL + }, + &force_parallel_stream_mode, + false, + NULL, NULL, NULL + }, { {"ssl", PGC_SIGHUP, CONN_AUTH_SSL, gettext_noop("Enables SSL connections."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 4d01ca9931..f2b3c10e1f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -314,6 +314,9 @@ #wal_sender_timeout = 60s # in milliseconds; 0 disables #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) +#force_parallel_stream_mode = off # if on, force sending the changes to output + # plugin immediately if streaming is supported changes Otherwise + # wait until logical_decoding_work_mem is exceeded. # - Primary Server - diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1549116338..5557d7ccb1 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -18,6 +18,7 @@ #include "utils/timestamp.h" extern PGDLLIMPORT int logical_decoding_work_mem; +extern PGDLLIMPORT bool force_parallel_stream_mode; /* an individual tuple, stored in one chunk of memory */ typedef struct ReorderBufferTupleBuf -- 2.24.0.windows.2