From 9bfb34dad67745f1155238a4b11bbe1d28284b50 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 20 May 2024 03:19:56 +0000 Subject: [PATCH] WIP: implement no-op filter --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/filter.out | 0 contrib/test_decoding/meson.build | 1 + contrib/test_decoding/sql/filter.sql | 13 +++++++++++++ contrib/test_decoding/test_decoding.c | 18 ++++++++++++++++++ src/backend/replication/logical/decode.c | 2 +- 6 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 contrib/test_decoding/expected/filter.out create mode 100644 contrib/test_decoding/sql/filter.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..b9b6034b9f 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats twophase twophase_stream + spill slot truncate stream stats twophase twophase_stream filter ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot diff --git a/contrib/test_decoding/expected/filter.out b/contrib/test_decoding/expected/filter.out new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index f1548c0faf..539a148708 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -41,6 +41,7 @@ tests += { 'stats', 'twophase', 'twophase_stream', + 'filter', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/sql/filter.sql b/contrib/test_decoding/sql/filter.sql new file mode 100644 index 0000000000..4cd4f1d824 --- /dev/null +++ b/contrib/test_decoding/sql/filter.sql @@ -0,0 +1,13 @@ +-- predictability +SET synchronous_commit = on; + +-- define table which will be filtered +CREATE TABLE test_tobeskipped (a int); + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +INSERT INTO test_tobeskipped VALUES (generate_series(1, 10)); + +-- check that changes to test_tobeskipped are not output + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 7c50d13969..6f0d076305 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -68,6 +68,9 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pg_decode_filter_table(struct LogicalDecodingContext *ctx, + Relation relation, + ReorderBufferChange *change); static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, @@ -133,6 +136,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; + cb->filter_by_table_cb = pg_decode_filter_table; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; cb->filter_prepare_cb = pg_decode_filter_prepare; @@ -467,6 +471,20 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +/* + * Filter out by the table. + * + * It is implemented for demonstating and testing purpose. If the table + * contains the "_tobeskipped" subsctring, then we filter out. + */ +static bool +pg_decode_filter_table(struct LogicalDecodingContext *ctx, + Relation relation, + ReorderBufferChange *change) +{ + return false; +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index f2b2e33521..4266a9b353 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -589,7 +589,7 @@ FilterByTable(LogicalDecodingContext *ctx, ReorderBufferChange *change) bool result = false; bool using_subtxn; - if (ctx->callbacks.filter_by_origin_cb == NULL) + if (ctx->callbacks.filter_by_table_cb == NULL) return false; switch (change->action) -- 2.43.0