From 9b226ad6e63dd541d7b26fe1565fb7d8459336e8 Mon Sep 17 00:00:00 2001 From: Shi Yu Date: Thu, 1 Dec 2022 11:27:32 +0800 Subject: [PATCH v58 5/7] Add GUC stream_serialize_threshold and test serializing messages to disk --- doc/src/sgml/config.sgml | 32 +++++++++++ .../replication/logical/applyparallelworker.c | 54 ++++++++++++------- src/backend/replication/logical/worker.c | 9 ++++ src/backend/utils/misc/guc_tables.c | 14 +++++ src/include/replication/worker_internal.h | 4 ++ .../t/032_stream_parallel_conflict.pl | 29 ++++++++++ 6 files changed, 122 insertions(+), 20 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index b7c84d1e5e..a9b0c6cdcd 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -11582,6 +11582,38 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1) + + stream_serialize_threshold (integer) + + stream_serialize_threshold configuration parameter + + + + + Forces the leader apply worker to serialize messages to files after + sending specified amount of streaming chunks to the parallel apply + worker. Setting this to zero serialize all messages. A value of + -1 (the default) disables this feature. This is + intended to test serialization to files with + streaming = parallel. + + + + When logical replication subscription streaming + parameter is set to parallel, the leader apply worker + sends messages to parallel workers with a timeout. By default, the + leader apply worker will serialize the remaining messages to files if + the timeout is exceeded. If this option is set to any value other than + -1, serialize to files even without timeout. + + + + This parameter can only be set in the postgresql.conf + file or on the server command line. + + + + diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index b9d84d5b27..a49d08c70a 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -252,6 +252,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL; /* A list to maintain subtransactions, if any. */ static List *subxactlist = NIL; +/* GUC variable */ +int stream_serialize_threshold = -1; + static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); @@ -1174,32 +1177,43 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data, for (;;) { - result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true); + bool force_serialize = (stream_serialize_threshold != -1 && + (stream_serialize_threshold == 0 || + stream_serialize_threshold < parallel_stream_nchunks)); - if (result == SHM_MQ_SUCCESS) - break; - else if (result == SHM_MQ_DETACHED) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send data to shared-memory queue"))); + if (!force_serialize) + { + result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true); - Assert(result == SHM_MQ_WOULD_BLOCK); + if (result == SHM_MQ_SUCCESS) + break; + else if (result == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send data to shared-memory queue"))); - /* Wait before retrying. */ - rc = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - SHM_SEND_RETRY_INTERVAL_MS, - WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); + Assert(result == SHM_MQ_WOULD_BLOCK); - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); + /* Wait before retrying. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + SHM_SEND_RETRY_INTERVAL_MS, + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (startTime == 0) + startTime = GetCurrentTimestamp(); } + else if (stream_serialize_threshold != 0) + parallel_stream_nchunks = 0; - if (startTime == 0) - startTime = GetCurrentTimestamp(); - else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), + if (force_serialize || + TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), SHM_SEND_TIMEOUT_MS)) { StringInfoData msg; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4b760a6a8e..81f21fbd58 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -326,6 +326,12 @@ static TransactionId stream_xid = InvalidTransactionId; */ static uint32 parallel_stream_nchanges = 0; +/* + * The number of streaming chunks sent by leader apply worker during one + * streamed transaction. This is only used when stream_serialize_threshold > 0. + */ +uint32 parallel_stream_nchunks = 0; + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -1556,6 +1562,9 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + if (stream_serialize_threshold > 0) + parallel_stream_nchunks++; + /* * Unlock the shared object lock so that the parallel apply worker * can continue to receive changes. diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 21b125fb26..df0e30b35e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -61,6 +61,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -3003,6 +3004,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"stream_serialize_threshold", PGC_SIGHUP, DEVELOPER_OPTIONS, + gettext_noop("Forces the leader apply worker to serialize messages " + "to files after sending specified amount of streaming " + "chunks in streaming parallel mode."), + gettext_noop("A value of -1 disables this feature."), + GUC_NOT_IN_SAMPLE + }, + &stream_serialize_threshold, + -1, -1, INT_MAX, + NULL, NULL, NULL + }, + { {"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Sets the amount of time to wait before forcing " diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 2756590adb..9bdc664546 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -211,6 +211,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; +extern PGDLLIMPORT int stream_serialize_threshold; + +extern PGDLLIMPORT uint32 parallel_stream_nchunks; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); diff --git a/src/test/subscription/t/032_stream_parallel_conflict.pl b/src/test/subscription/t/032_stream_parallel_conflict.pl index 10efd2d376..62d58cec3c 100644 --- a/src/test/subscription/t/032_stream_parallel_conflict.pl +++ b/src/test/subscription/t/032_stream_parallel_conflict.pl @@ -146,6 +146,35 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(10000), 'data replicated to subscriber after dropping index'); +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# ============================================================================ +# Test serializing messages to disk +# ============================================================================ + +# Set stream_serialize_threshold to zero, so the messages will be serialized to disk. +$node_subscriber->safe_psql('postgres', + 'ALTER SYSTEM SET stream_serialize_threshold = 0;'); +$node_subscriber->reload; + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i)"); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), 'data replicated to subscriber by serializing messages to disk'); + $node_subscriber->stop; $node_publisher->stop; -- 2.23.0.windows.1