From 4dc345bb9e9e09620b9d0b6cd5c630b43b206895 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 14 Dec 2022 20:38:50 +0800 Subject: [PATCH v62 5/8] Add GUC stream_serialize_threshold and test serializing messages to disk --- doc/src/sgml/config.sgml | 32 +++++ .../replication/logical/applyparallelworker.c | 12 ++ src/backend/replication/logical/worker.c | 9 ++ src/backend/utils/misc/guc_tables.c | 14 ++ src/include/replication/worker_internal.h | 4 + .../subscription/t/032_stream_parallel_conflict.pl | 141 ++++++++++++++++++++- 6 files changed, 208 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 0359db1..8d4122f 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -11616,6 +11616,38 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1) + + stream_serialize_threshold (integer) + + stream_serialize_threshold configuration parameter + + + + + Forces the leader apply worker to serialize messages to files after + sending specified amount of streaming chunks to the parallel apply + worker. Setting this to zero serialize all messages. A value of + -1 (the default) disables this feature. This is + intended to test serialization to files with + streaming = parallel. + + + + When logical replication subscription streaming + parameter is set to parallel, the leader apply worker + sends messages to parallel workers with a timeout. By default, the + leader apply worker will serialize the remaining messages to files if + the timeout is exceeded. If this option is set to any value other than + -1, serialize to files even without timeout. + + + + This parameter can only be set in the postgresql.conf + file or on the server command line. + + + + diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 942b762..593d850 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -252,6 +252,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL; /* A list to maintain subtransactions, if any. */ static List *subxactlist = NIL; +/* GUC variable */ +int stream_serialize_threshold = -1; + static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); static PartialFileSetState pa_get_fileset_state(void); @@ -1155,6 +1158,15 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) Assert(!IsTransactionState()); Assert(!winfo->serialize_changes); + /* Force to serialize messages if stream_serialize_threshold is reached. */ + if (stream_serialize_threshold != -1 && + (stream_serialize_threshold == 0 || + stream_serialize_threshold < parallel_stream_nchunks)) + { + parallel_stream_nchunks = 0; + return false; + } + #define SHM_SEND_RETRY_INTERVAL_MS 1000 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 596ed7d..df67851 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -327,6 +327,12 @@ static TransactionId stream_xid = InvalidTransactionId; static uint32 parallel_stream_nchanges = 0; /* + * The number of streaming chunks sent by leader apply worker during one + * streamed transaction. This is only used when stream_serialize_threshold > 0. + */ +uint32 parallel_stream_nchunks = 0; + +/* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. * Once we start skipping changes, we don't stop it until we skip all changes of @@ -1517,6 +1523,9 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + if (stream_serialize_threshold > 0) + parallel_stream_nchunks++; + /* * Once we start serializing the changes, the parallel apply worker * will wait for the leader to release the stream lock until the diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 21b125f..df0e30b 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -61,6 +61,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -3004,6 +3005,19 @@ struct config_int ConfigureNamesInt[] = }, { + {"stream_serialize_threshold", PGC_SIGHUP, DEVELOPER_OPTIONS, + gettext_noop("Forces the leader apply worker to serialize messages " + "to files after sending specified amount of streaming " + "chunks in streaming parallel mode."), + gettext_noop("A value of -1 disables this feature."), + GUC_NOT_IN_SAMPLE + }, + &stream_serialize_threshold, + -1, -1, INT_MAX, + NULL, NULL, NULL + }, + + { {"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Sets the amount of time to wait before forcing " "log file rotation."), diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 8fe8a11..849b36c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -210,6 +210,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; +extern PGDLLIMPORT int stream_serialize_threshold; + +extern PGDLLIMPORT uint32 parallel_stream_nchunks; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); diff --git a/src/test/subscription/t/032_stream_parallel_conflict.pl b/src/test/subscription/t/032_stream_parallel_conflict.pl index 10efd2d..59e7e75 100644 --- a/src/test/subscription/t/032_stream_parallel_conflict.pl +++ b/src/test/subscription/t/032_stream_parallel_conflict.pl @@ -13,8 +13,11 @@ my $offset = 0; # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); -$node_publisher->append_conf('postgresql.conf', - 'logical_decoding_work_mem = 64kB'); +$node_publisher->append_conf( + 'postgresql.conf', qq( + logical_decoding_work_mem = 64kB + max_prepared_transactions = 10 +)); $node_publisher->start; # Create subscriber node @@ -24,7 +27,11 @@ $node_subscriber->init; # Check if any streaming chunks are applied using the parallel apply worker. We # have to look for the DEBUG1 log messages about that, so bump up the log # verbosity. -$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->append_conf( + 'postgresql.conf', qq( + log_min_messages = debug1 + max_prepared_transactions = 10 +)); $node_subscriber->start; @@ -47,7 +54,7 @@ $node_subscriber->safe_psql( CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub - WITH (streaming = parallel, copy_data = false)"); + WITH (streaming = parallel, two_phase = on, copy_data = false)"); $node_publisher->wait_for_catchup($appname); @@ -146,6 +153,132 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(10000), 'data replicated to subscriber after dropping index'); +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# ============================================================================ +# Test serializing messages to disk +# ============================================================================ + +# Set stream_serialize_threshold to zero, so the messages will be serialized to disk. +$node_subscriber->safe_psql('postgres', + 'ALTER SYSTEM SET stream_serialize_threshold = 0;'); +$node_subscriber->reload; + +# Serialize the COMMIT transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i)"); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), 'data replicated to subscriber by serializing messages to disk'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# Serialize the PREPARE transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i); + PREPARE TRANSACTION 'xact'; + }); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'xact';"); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), 'data replicated to subscriber by serializing messages to disk'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# Serialize the ABORT top-transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i); + ROLLBACK; + }); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is aborted on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(0), 'data replicated to subscriber by serializing messages to disk'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# Serialize the ABORT sub-transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i); + SAVEPOINT sp; + INSERT INTO test_tab SELECT i FROM generate_series(5001, 10000) s(i); + ROLLBACK TO sp; + COMMIT; + }); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that only sub-transaction is aborted on subscriber. +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), 'data replicated to subscriber by serializing messages to disk'); + $node_subscriber->stop; $node_publisher->stop; -- 2.7.2.windows.1