From 9b102ac120313b4443e15a7d3d46a59a8b263ff3 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 19 Jun 2023 08:28:00 +0000 Subject: [PATCH v2] Allow to use Hash index on subscriber --- src/backend/executor/execReplication.c | 69 ++++++++++++++-- src/backend/replication/logical/relation.c | 27 +++++-- src/backend/utils/cache/lsyscache.c | 22 +++++ src/include/executor/executor.h | 3 + src/include/utils/lsyscache.h | 1 + src/test/subscription/meson.build | 1 + src/test/subscription/t/034_hash.pl | 93 ++++++++++++++++++++++ 7 files changed, 204 insertions(+), 12 deletions(-) create mode 100644 src/test/subscription/t/034_hash.pl diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 9dd7168461..023585b619 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -19,6 +19,8 @@ #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" +#include "catalog/pg_am_d.h" +#include "commands/defrem.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" @@ -41,15 +43,67 @@ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq); +/* + * Return the strategy number which corresponds to the equality operator for + * given index access method. + * + * TODO: support other indexes: GiST, SP-GiST, other user-defined idexes. + */ +int +get_equal_strategy_number_for_am(Oid am) +{ + int ret; + + switch (am) + { + case BTREE_AM_OID: + ret = BTEqualStrategyNumber; + break; + case HASH_AM_OID: + ret = HTEqualStrategyNumber; + break; + case GIST_AM_OID: + case SPGIST_AM_OID: + /* TODO */ + case GIN_AM_OID: + case BRIN_AM_OID: + /* + * XXX: GIN and BRIN do not support for the moment because they do not + * implement amgettuple API. + */ + default: + /* XXX: Do we have to support extended indexes? */ + ret = InvalidStrategy; + break; + } + + return ret; +} + +/* + * Return the appropriate strategy number which corresponds to the equality + * operator. + * + * TODO: support other indexes: GiST, SP-GiST, other user-defined idexes. + */ +int +get_equal_strategy_number(Oid opclass) +{ + Oid am = get_opclass_method(opclass); + + return get_equal_strategy_number_for_am(am); +} + + /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that * is setup to match 'rel' (*NOT* idxrel!). * * Returns how many columns to use for the index scan. * - * This is not generic routine, it expects the idxrel to be a btree, non-partial - * and have at least one column reference (i.e. cannot consist of only - * expressions). + * This is not generic routine, it expects the idxrel to be a btree or a hash, + * non-partial and have at least one column reference (i.e. cannot consist of + * only expressions). * * By definition, replication identity of a rel meets all limitations associated * with that. Note that any other index could also meet these limitations. @@ -77,6 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, Oid opfamily; RegProcedure regop; int table_attno = indkey->values[index_attoff]; + int eq_strategy; if (!AttributeNumberIsValid(table_attno)) { @@ -93,20 +148,22 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, */ optype = get_opclass_input_type(opclass->values[index_attoff]); opfamily = get_opclass_family(opclass->values[index_attoff]); + eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]); operator = get_opfamily_member(opfamily, optype, optype, - BTEqualStrategyNumber); + eq_strategy); + if (!OidIsValid(operator)) elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", - BTEqualStrategyNumber, optype, optype, opfamily); + eq_strategy, optype, optype, opfamily); regop = get_opcode(operator); /* Initialize the scankey. */ ScanKeyInit(&skey[skey_attoff], index_attoff + 1, - BTEqualStrategyNumber, + eq_strategy, regop, searchslot->tts_values[table_attno - 1]); diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 57ad22b48a..5f5da4c9d8 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -17,6 +17,7 @@ #include "postgres.h" +#include "access/amapi.h" #include "access/genam.h" #include "access/table.h" #include "catalog/namespace.h" @@ -779,8 +780,8 @@ RemoteRelContainsLeftMostColumnOnIdx(IndexInfo *indexInfo, AttrMap *attrmap) /* * Returns the oid of an index that can be used by the apply worker to scan - * the relation. The index must be btree, non-partial, and have at least - * one column reference (i.e. cannot consist of only expressions). These + * the relation. The index must be btree or hash, non-partial, and have at + * least one column reference (i.e. cannot consist of only expressions). These * limitations help to keep the index scan similar to PK/RI index scans. * * Note that the limitations of index scans for replica identity full only @@ -841,11 +842,25 @@ FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap) bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo) { - bool is_btree = (indexInfo->ii_Am == BTREE_AM_OID); - bool is_partial = (indexInfo->ii_Predicate != NIL); - bool is_only_on_expression = IsIndexOnlyOnExpression(indexInfo); + IndexAmRoutine *routine; + bool is_suitable_type; + bool is_partial; + bool is_only_on_expression; - return is_btree && !is_partial && !is_only_on_expression; + /* + * XXX: Support only indexes which implement amgettuple API. This is + * because RelationFindReplTupleByIndex() assumes to be able to fetch + * tuples one by one by the API. + */ + routine = GetIndexAmRoutineByAmId(indexInfo->ii_Am, false); + is_suitable_type = ((routine->amgettuple != NULL) && + (get_equal_strategy_number_for_am(indexInfo->ii_Am) + != InvalidStrategy)); + + is_partial = (indexInfo->ii_Predicate != NIL); + is_only_on_expression = IsIndexOnlyOnExpression(indexInfo); + + return is_suitable_type && !is_partial && !is_only_on_expression; } /* diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index 60978f9415..05813d98ae 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -1255,6 +1255,28 @@ get_opclass_opfamily_and_input_type(Oid opclass, Oid *opfamily, Oid *opcintype) return true; } +/* + * get_opclass_method + * + * Returns the OID of the index access method operator class is for. + */ +Oid +get_opclass_method(Oid opclass) +{ + HeapTuple tp; + Form_pg_opclass cla_tup; + Oid result; + + tp = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for opclass %u", opclass); + cla_tup = (Form_pg_opclass) GETSTRUCT(tp); + + result = cla_tup->opcmethod; + ReleaseSysCache(tp); + return result; +} + /* ---------- OPERATOR CACHE ---------- */ /* diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index ac02247947..3a23510478 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -666,6 +666,9 @@ extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd); extern void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname); +extern int get_equal_strategy_number(Oid opclass); +extern int get_equal_strategy_number_for_am(Oid am); + /* * prototypes from functions in nodeModifyTable.c */ diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 4f5418b972..ea8fbe42cd 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -106,6 +106,7 @@ extern Oid get_opclass_family(Oid opclass); extern Oid get_opclass_input_type(Oid opclass); extern bool get_opclass_opfamily_and_input_type(Oid opclass, Oid *opfamily, Oid *opcintype); +extern Oid get_opclass_method(Oid opclass); extern RegProcedure get_opcode(Oid opno); extern char *get_opname(Oid opno); extern Oid get_op_rettype(Oid opno); diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index bd673a9d68..5fa5f0a790 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -40,6 +40,7 @@ tests += { 't/031_column_list.pl', 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', + 't/034_hash.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/034_hash.pl b/src/test/subscription/t/034_hash.pl new file mode 100644 index 0000000000..3aead6958b --- /dev/null +++ b/src/test/subscription/t/034_hash.pl @@ -0,0 +1,93 @@ +# Copyright (c) 2022-2023, PostgreSQL Global Development Group + +# Test logical replication behavior with subscriber using available index +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $appname = 'tap_sub'; +my $result = ''; + +# ============================================================================= +# Testcase start: Subscription can use hash index +# + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (x int, y text)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (x int, y text)"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX test_replica_id_full_idx ON test_replica_id_full USING HASH (x)"); + +# insert some initial data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_replica_id_full SELECT i, (i%10)::text FROM generate_series(0,10) i" +); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +# delete 2 rows +$node_publisher->safe_psql('postgres', + "DELETE FROM test_replica_id_full WHERE x IN (5, 6)"); + +# update 2 rows +$node_publisher->safe_psql('postgres', + "UPDATE test_replica_id_full SET x = 100, y = '200' WHERE x IN (1, 2)"); + +# wait until the index is used on the subscriber +# XXX: the test will be suspended here +$node_publisher->wait_for_catchup($appname); +$node_subscriber->poll_query_until('postgres', + q{select (idx_scan = 4) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';} + ) + or die + "Timed out while waiting for check subscriber tap_sub_rep_full deletes 2 rows and updates 2 rows via index"; + +# make sure that the subscriber has the correct data after the UPDATE +$result = $node_subscriber->safe_psql('postgres', + "select count(*) from test_replica_id_full WHERE (x = 100 and y = '200')" +); +is($result, qq(2), + 'ensure subscriber has the correct data at the end of the test'); + +# make sure that the subscriber has the correct data after the first DELETE +$result = $node_subscriber->safe_psql('postgres', + "select count(*) from test_replica_id_full where x in (5, 6)"); +is($result, qq(0), + 'ensure subscriber has the correct data at the end of the test'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full"); +# cleanup sub +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full"); +$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full"); + +# Testcase end: Subscription can use index +# ============================================================================= + +done_testing(); -- 2.27.0