From e5484c1e46457103f8e3d3a5e126ae4825c80fc6 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 21 May 2024 23:57:55 -0400 Subject: [PATCH v2] Reduce useless changes before reassemble during logical replication. In order to reduce unnecessary logical replication, irrelevant relationship changes can be filtered out before reorganizing transaction fragments. This can effectively reduce useless changes and prevent storage space from being filled up with irrelevant data. By design, Added a callback LogicalDecodeFilterByRelCB for the output plugin. We implemented a function pgoutput_table_filter for pgoutput. And RelationSyncCache is reused to determine whether a relationship-related change should be filtered out. referring to the implementation of the function pgoutput_change, currently only insert/update/delete is can filtered, and other types of changes are not considered. Perhaps more detailed analysis can be done and more filters can be filtered. Most of the code in the FilterByTable function is transplanted from the ReorderBufferProcessTXN function, which can be called before the ReorderBufferQueueChange function.It is also the encapsulation of the callback function filter_by_table_cb in logical.c. In general, this patch concentrates the filtering of changes in the ReorderBufferProcessTXN function and the pgoutput_change function into the FilterByTable function, and calls it before the ReorderBufferQueueChange function. Author: Lijie --- src/backend/replication/logical/decode.c | 157 +++++++++++++++++++++++++++- src/backend/replication/logical/logical.c | 31 ++++++ src/backend/replication/pgoutput/pgoutput.c | 89 ++++++++++------ src/include/replication/logical.h | 2 + src/include/replication/output_plugin.h | 7 ++ src/test/subscription/t/034_table_filter.pl | 91 ++++++++++++++++ 6 files changed, 339 insertions(+), 38 deletions(-) create mode 100644 src/test/subscription/t/034_table_filter.pl diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 7a86f84..01b43cc 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -33,12 +33,14 @@ #include "access/xlogreader.h" #include "access/xlogrecord.h" #include "catalog/pg_control.h" +#include "common/string.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standbydefs.h" +#include "utils/relfilenumbermap.h" /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -152,7 +154,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_PARAMETER_CHANGE: { xl_parameter_change *xlrec = - (xl_parameter_change *) XLogRecGetData(buf->record); + (xl_parameter_change *) XLogRecGetData(buf->record); /* * If wal_level on the primary is reduced to less than @@ -578,6 +580,138 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) return filter_by_origin_cb_wrapper(ctx, origin_id); } +static bool +FilterByTable(LogicalDecodingContext *ctx, ReorderBufferChange *change) +{ + ReorderBuffer *rb = ctx->reorder; + Relation relation = NULL; + SnapBuild *builder = ctx->snapshot_builder; + Oid reloid; + bool result = false; + bool using_subtxn; + + if (ctx->callbacks.filter_by_table_cb == NULL) + return false; + + if(SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT) + return false; + + switch (change->action) + { + /* intentionally fall through */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + break; + default: + return false; + } + + /* + * Decoding needs access to syscaches et al., which in turn use + * heavyweight locks and such. Thus we need to have enough state around to + * keep track of those. The easiest way is to simply use a transaction + * internally. That also allows us to easily enforce that nothing writes + * to the database by checking for xid assignments. + * + * When we're called via the SQL SRF there's already a transaction + * started, so start an explicit subtransaction there. + */ + using_subtxn = IsTransactionOrTransactionBlock(); + + if (using_subtxn) + BeginInternalSubTransaction("filter change by table"); + else + StartTransactionCommand(); + + reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid, + change->data.tp.rlocator.relNumber); + if (reloid == InvalidOid) + { + result = true; + goto filter_done; + } + + relation = RelationIdGetRelation(reloid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")", + reloid, + relpathperm(change->data.tp.rlocator, + MAIN_FORKNUM)); + + if (!RelationIsLogicallyLogged(relation)) + { + result = true; + goto filter_done; + } + + /* + * Ignore temporary heaps created during DDL unless the plugin has asked + * for them. + */ + if (relation->rd_rel->relrewrite && !rb->output_rewrites) + { + result = true; + goto filter_done; + } + + /* + * For now ignore sequence changes entirely. Most of the time they don't + * log changes using records we understand, so it doesn't make sense to + * handle the few cases we do. + */ + if (relation->rd_rel->relkind == RELKIND_SEQUENCE) + { + result = true; + goto filter_done; + } + + if (IsToastRelation(relation)) + { + Oid real_reloid = InvalidOid; + + /* pg_toast_ len is 9 */ + char *toast_name = RelationGetRelationName(relation); + char *start_ch = &toast_name[9]; + + real_reloid = strtoint(start_ch, NULL, 10); + + if (real_reloid == InvalidOid) + elog(ERROR, "cannot get the real table oid for toast table %s, error: %m", toast_name); + + RelationClose(relation); + + relation = RelationIdGetRelation(real_reloid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open real relation with OID %u (for toast table filenumber \"%s\")", + reloid, + relpathperm(change->data.tp.rlocator, + MAIN_FORKNUM)); + } + + result = filter_by_table_cb_wrapper(ctx, relation, change); + +filter_done: + + if (result && RelationIsValid(relation)) + elog(DEBUG1, "logical filter change by table %s", RelationGetRelationName(relation)); + + if (RelationIsValid(relation)) + RelationClose(relation); + + AbortCurrentTransaction(); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + + if (result) + ReorderBufferReturnChange(rb, change, false); + + return result; +} + /* * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord(). */ @@ -937,6 +1071,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; + if (FilterByTable(ctx, change)) + return; + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, xlrec->flags & XLH_INSERT_ON_TOAST_RELATION); @@ -1006,6 +1143,9 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; + if (FilterByTable(ctx, change)) + return; + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); } @@ -1062,6 +1202,9 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; + if (FilterByTable(ctx, change)) + return; + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); } @@ -1198,11 +1341,14 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else change->data.tp.clear_toast_afterwards = false; - ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), - buf->origptr, change, false); - /* move to the next xl_multi_insert_tuple entry */ data += datalen; + + if (FilterByTable(ctx, change)) + continue; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), + buf->origptr, change, false); } Assert(data == tupledata + tuplelen); } @@ -1237,6 +1383,9 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = true; + if (FilterByTable(ctx, change)) + return; + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 97a4d99..49ac184 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1242,6 +1242,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) return ret; } +bool +filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation, ReorderBufferChange *change) +{ + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + Assert(!ctx->fast_forward); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_by_table"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + ctx->end_xact = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_by_table_cb(ctx, relation, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + return ret; +} + static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index d2b35cf..3d1de74 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -56,6 +56,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pgoutput_table_filter(struct LogicalDecodingContext *ctx, + Relation relation, + ReorderBufferChange *change); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, @@ -258,6 +261,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_prepared_cb = pgoutput_commit_prepared_txn; cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; + cb->filter_by_table_cb = pgoutput_table_filter; cb->shutdown_cb = pgoutput_shutdown; /* transaction streaming */ @@ -1414,9 +1418,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; - if (!is_publishable_relation(relation)) - return; - /* * Remember the xid for the change in streaming mode. We need to send xid * with each change in the streaming mode so that subscriber can make @@ -1427,37 +1428,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, xid = change->txn->xid; relentry = get_rel_sync_entry(data, relation); - - /* First check the table filter */ - switch (action) - { - case REORDER_BUFFER_CHANGE_INSERT: - if (!relentry->pubactions.pubinsert) - return; - break; - case REORDER_BUFFER_CHANGE_UPDATE: - if (!relentry->pubactions.pubupdate) - return; - break; - case REORDER_BUFFER_CHANGE_DELETE: - if (!relentry->pubactions.pubdelete) - return; - - /* - * This is only possible if deletes are allowed even when replica - * identity is not defined for a table. Since the DELETE action - * can't be published, we simply return. - */ - if (!change->data.tp.oldtuple) - { - elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); - return; - } - break; - default: - Assert(false); - } - /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -1684,6 +1654,57 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, } /* + * Return true if the relation has not been published, false otherwise. + */ +static bool +pgoutput_table_filter(struct LogicalDecodingContext *ctx, + Relation relation, + ReorderBufferChange *change) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + RelationSyncEntry *relentry; + ReorderBufferChangeType action = change->action; + + if (!is_publishable_relation(relation)) + return true; + + relentry = get_rel_sync_entry(data, relation); + + /* First check the table filter */ + switch (action) + { + case REORDER_BUFFER_CHANGE_INSERT: + if (!relentry->pubactions.pubinsert) + return true; + break; + case REORDER_BUFFER_CHANGE_UPDATE: + if (!relentry->pubactions.pubupdate) + return true; + break; + case REORDER_BUFFER_CHANGE_DELETE: + if (!relentry->pubactions.pubdelete) + return true; + + /* + * This is only possible if deletes are allowed even when replica + * identity is not defined for a table. Since the DELETE action + * can't be published, we simply return. + */ + if (!change->data.tp.oldtuple) + { + elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); + return true; + } + break; + default: + Assert(false); + } + + return false; +} + + +/* * Shutdown the output plugin. * * Note, we don't need to clean the data->context and data->cachectx as diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index aff38e8..dae1411 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -145,6 +145,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); +extern bool filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation, + ReorderBufferChange *change); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 44988eb..030eb5a 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -97,6 +97,12 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct RepOriginId origin_id); /* + * Filter changes by table. + */ +typedef bool (*LogicalDecodeFilterByRelCB) (struct LogicalDecodingContext *ctx, + Relation relation, ReorderBufferChange *change); + +/* * Called to shutdown an output plugin. */ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); @@ -222,6 +228,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; + LogicalDecodeFilterByRelCB filter_by_table_cb; LogicalDecodeShutdownCB shutdown_cb; /* streaming of changes at prepare time */ diff --git a/src/test/subscription/t/034_table_filter.pl b/src/test/subscription/t/034_table_filter.pl new file mode 100644 index 0000000..8d4f962 --- /dev/null +++ b/src/test/subscription/t/034_table_filter.pl @@ -0,0 +1,91 @@ +# Copyright (c) 2021-2023, PostgreSQL Global Development Group + +# Basic logical replication test +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "create table tbl_pub(id int, val1 text, val2 text,size int);"); +$node_publisher->safe_psql('postgres', + "create table tbl_t1(id int, val1 text, val2 text,size int);"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR TABLE public.tbl_pub;"); +$node_publisher->safe_psql('postgres', +qq( +CREATE OR REPLACE FUNCTION check_replication_status() RETURNS VOID AS \$\$ +DECLARE + replication_record pg_stat_replication; +BEGIN + LOOP + SELECT * + INTO replication_record + FROM pg_stat_replication + WHERE application_name = 'mysub'; + + IF replication_record.replay_lsn = replication_record.write_lsn THEN + EXIT; + END IF; + + PERFORM pg_sleep(1); + END LOOP; +END; +\$\$ LANGUAGE plpgsql;)); + +# Create some preexisting content on subscriber +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_subscriber->safe_psql('postgres', + "create table tbl_pub(id int, val1 text, val2 text,size text);"); +$node_subscriber->safe_psql('postgres', + "create table tbl_t1(id int, val1 text, val2 text,size text);"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub'); + +# test filter +$node_publisher->safe_psql('postgres', +qq(BEGIN; +insert into tbl_t1 select 1, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub'); +insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',i),(select sum(size) from pg_ls_replslotdir('mysub')) from generate_series(2,9999) i; +update tbl_t1 set val2 = repeat('xyzzy',id) where id > 1 and id < 10001; +select check_replication_status(); +insert into tbl_t1 select 10001, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub'); +COMMIT;) +); + +my $minsize = $node_publisher->safe_psql('postgres', + "select size from tbl_t1 order by size asc limit 1;"); +my $maxsize = $node_publisher->safe_psql('postgres', + "select size from tbl_t1 order by size desc limit 1;"); +is($minsize, $maxsize, 'check decode filter table between maxsize and minsize'); + + +my $fristrow = $node_publisher->safe_psql('postgres', + "select size from tbl_t1 where id = 1;"); +my $lastrow = $node_publisher->safe_psql('postgres', + "select size from tbl_t1 where id = 10001;"); +is($minsize, $maxsize, 'check decode filter table between fristrow and lastrow'); + +is($minsize, $lastrow, 'check decode filter table between minsize and lastrow'); + +print "minsize: " . $minsize . "maxsize: " . $maxsize ."fristrow: " . $fristrow ."lastrow: " . $lastrow . "\n"; + +done_testing(); \ No newline at end of file -- 1.8.3.1