From 911370c47e340cfe3135c62a85a2aa15d099e071 Mon Sep 17 00:00:00 2001 From: Shi Yu Date: Mon, 24 Oct 2022 15:52:14 +0800 Subject: [PATCH v65 3/7] Allow streaming every change without waiting till logical_decoding_work_mem. Add a new GUC force_stream_mode, when it is set on, send the change to output plugin immediately in streaming mode. Otherwise, send until logical_decoding_work_mem is exceeded. --- contrib/test_decoding/expected/stream.out | 13 +++ contrib/test_decoding/sql/stream.sql | 8 ++ doc/src/sgml/config.sgml | 16 ++++ .../replication/logical/reorderbuffer.c | 82 ++++++++++++------- src/backend/utils/misc/guc_tables.c | 10 +++ src/include/replication/reorderbuffer.h | 1 + 6 files changed, 100 insertions(+), 30 deletions(-) diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index 0f21dcb8e0..b0bb467b2b 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -106,6 +106,19 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl committing streamed transaction (17 rows) +-- streaming test with force_stream_mode +SET force_stream_mode=on; +TRUNCATE table stream_test; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +------------------------------------------ + opening a streamed block for transaction + streaming truncate for transaction + closing a streamed block for transaction + committing streamed transaction +(4 rows) + +RESET force_stream_mode; DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql index 4feec62972..a5a63e71fa 100644 --- a/contrib/test_decoding/sql/stream.sql +++ b/contrib/test_decoding/sql/stream.sql @@ -44,5 +44,13 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +-- streaming test with force_stream_mode +SET force_stream_mode=on; + +TRUNCATE table stream_test; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + +RESET force_stream_mode; + DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 104cce71f0..946efc515d 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -11623,6 +11623,22 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1) + + force_stream_mode (boolean) + + force_stream_mode configuration parameter + + + + + Specifies whether to force sending the changes to output plugin + immediately in streaming mode. If set to off (the + default), send until logical_decoding_work_mem is + exceeded. + + + + diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 8d5bbaadb1..50e800b6e7 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -209,6 +209,12 @@ typedef struct ReorderBufferDiskChange int logical_decoding_work_mem; static const Size max_changes_in_memory = 4096; /* XXX for restore only */ +/* + * Whether to send the change to output plugin immediately in streaming mode. + * When it is off, wait until logical_decoding_work_mem is exceeded. + */ +bool force_stream_mode; + /* --------------------------------------- * primary reorderbuffer support routines * --------------------------------------- @@ -3543,7 +3549,9 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) /* * Check whether the logical_decoding_work_mem limit was reached, and if yes * pick the largest (sub)transaction at-a-time to evict and spill its changes to - * disk until we reach under the memory limit. + * disk or send to output plugin until we reach under the memory limit. + * + * If force_stream_mode is enabled, send all streamable changes. * * XXX At this point we select the transactions until we reach under the memory * limit, but we might also adapt a more elaborate eviction strategy - for example @@ -3555,50 +3563,64 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) - return; - /* - * Loop until we reach under the memory limit. One might think that just - * by evicting the largest (sub)transaction we will come under the memory - * limit based on assumption that the selected transaction is at least as - * large as the most recent change (which caused us to go over the memory - * limit). However, that is not true because a user can reduce the - * logical_decoding_work_mem to a smaller value before the most recent - * change. + * If possible, evict streamable transactions from memory by streaming. */ - while (rb->size >= logical_decoding_work_mem * 1024L) + if (ReorderBufferCanStartStreaming(rb)) { /* - * Pick the largest transaction (or subtransaction) and evict it from - * memory by streaming, if possible. Otherwise, spill to disk. + * If force_stream_mode is off, loop until we reach under the memory + * limit. Otherwise loop until there's no streamable top transaction. + * One might think that just by evicting the largest transaction we + * will come under the memory limit based on assumption that the + * selected transaction is at least as large as the most recent change + * (which caused us to go over the memory limit). However, that is not + * true because a user can reduce the logical_decoding_work_mem to a + * smaller value or enable force_stream_mode before the most recent + * change. */ - if (ReorderBufferCanStartStreaming(rb) && - (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL) + while (rb->size >= logical_decoding_work_mem * 1024L || + (force_stream_mode && rb->size > 0)) { - /* we know there has to be one, because the size is not zero */ + txn = ReorderBufferLargestStreamableTopTXN(rb); + if (txn == NULL) + break; + Assert(txn && !txn->toptxn); Assert(txn->total_size > 0); Assert(rb->size >= txn->total_size); ReorderBufferStreamTXN(rb, txn); - } - else - { + /* - * Pick the largest transaction (or subtransaction) and evict it - * from memory by serializing it to disk. + * After eviction, the transaction should have no entries in + * memory, and should use 0 bytes for changes. */ - txn = ReorderBufferLargestTXN(rb); + Assert(txn->size == 0); + Assert(txn->nentries_mem == 0); + } + } - /* we know there has to be one, because the size is not zero */ - Assert(txn); - Assert(txn->size > 0); - Assert(rb->size >= txn->size); + /* + * Evict transactions from memory by serializing to disk. Loop until we + * reach under the memory limit. One might think that just by evicting the + * largest (sub)transaction we will come under the memory limit, but this + * is not sufficient. See comments above. + */ + while (rb->size >= logical_decoding_work_mem * 1024L) + { + /* + * Pick the largest transaction (or subtransaction) and evict it from + * memory by serializing it to disk. + */ + txn = ReorderBufferLargestTXN(rb); - ReorderBufferSerializeTXN(rb, txn); - } + /* we know there has to be one, because the size is not zero */ + Assert(txn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); + + ReorderBufferSerializeTXN(rb, txn); /* * After eviction, the transaction should have no entries in memory, diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index bc56c32373..77519a56fe 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -1262,6 +1262,16 @@ struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"force_stream_mode", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Force sending the changes to output plugin immediately if streaming is supported, without waiting till logical_decoding_work_mem."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &force_stream_mode, + false, + NULL, NULL, NULL + }, { {"log_duration", PGC_SUSET, LOGGING_WHAT, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index f0eb6c4f74..f24c886e1b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -18,6 +18,7 @@ #include "utils/timestamp.h" extern PGDLLIMPORT int logical_decoding_work_mem; +extern PGDLLIMPORT bool force_stream_mode; /* an individual tuple, stored in one chunk of memory */ typedef struct ReorderBufferTupleBuf -- 2.31.1