From 2bcd151f2e639c2aa7c5d5d917750b3e4e904428 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Mon, 27 Oct 2025 10:41:38 +0800 Subject: [PATCH vRefactoring] Replace Hash table with a List and eliminate invalidations --- .../replication/logical/sequencesync.c | 304 ++++++------------ src/include/catalog/pg_subscription_rel.h | 3 +- 2 files changed, 97 insertions(+), 210 deletions(-) diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 5610ebe252d..2de75c2e30d 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -60,11 +60,11 @@ #include "postgres.h" +#include "access/sequence.h" #include "access/table.h" #include "catalog/pg_sequence.h" #include "catalog/pg_subscription_rel.h" #include "commands/sequence.h" -#include "common/hashfn.h" #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" @@ -77,23 +77,21 @@ #include "utils/guc.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/rls.h" #include "utils/syscache.h" #include "utils/usercontext.h" -#define REMOTE_SEQ_COL_COUNT 11 +#define REMOTE_SEQ_COL_COUNT 10 typedef enum CopySeqResult { COPYSEQ_SUCCESS, COPYSEQ_MISMATCH, COPYSEQ_INSUFFICIENT_PERM, - COPYSEQ_SKIPPED } CopySeqResult; -static HTAB *sequences_to_copy = NULL; - /* * Handle sequence synchronization cooperation from the apply worker. * @@ -230,7 +228,7 @@ append_sequence_name(StringInfo buf, const char *nspname, const char *seqname, * publisher. */ static void -get_remote_sequence_info(TupleTableSlot *slot, LogicalRepSeqHashKey *key, +get_remote_sequence_info(TupleTableSlot *slot, int *seqidx, int64 *last_value, bool *is_called, XLogRecPtr *page_lsn, Oid *remote_typid, int64 *remote_start, int64 *remote_increment, @@ -240,10 +238,7 @@ get_remote_sequence_info(TupleTableSlot *slot, LogicalRepSeqHashKey *key, bool isnull; int col = 0; - key->nspname = TextDatumGetCString(slot_getattr(slot, ++col, &isnull)); - Assert(!isnull); - - key->seqname = TextDatumGetCString(slot_getattr(slot, ++col, &isnull)); + *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); *last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); @@ -281,30 +276,19 @@ get_remote_sequence_info(TupleTableSlot *slot, LogicalRepSeqHashKey *key, * Compare sequence parameters from publisher with local sequence. */ static CopySeqResult -validate_sequence(Relation sequence_rel, LogicalRepSequenceInfo *seqinfo, - Oid remote_typid, int64 remote_start, - int64 remote_increment, int64 remote_min, +validate_sequence(LogicalRepSequenceInfo *seqinfo, Oid remote_typid, + int64 remote_start, int64 remote_increment, int64 remote_min, int64 remote_max, bool remote_cycle) { Form_pg_sequence local_seq; HeapTuple tup; CopySeqResult result = COPYSEQ_SUCCESS; - /* Sequence was concurrently dropped */ - if (!sequence_rel) - return COPYSEQ_SKIPPED; - /* Sequence was concurrently dropped */ tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo->localrelid)); if (!HeapTupleIsValid(tup)) - return COPYSEQ_SKIPPED; - - /* Sequence was concurrently invalidated */ - if (!seqinfo->entry_valid) - { - ReleaseSysCache(tup); - return COPYSEQ_SKIPPED; - } + elog(ERROR, "cache lookup failed for sequence %u", + seqinfo->localrelid); local_seq = (Form_pg_sequence) GETSTRUCT(tup); if (local_seq->seqtypid != remote_typid || @@ -329,6 +313,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, int64 last_value, UserContext ucxt; AclResult aclresult; bool run_as_owner = MySubscription->runasowner; + Oid seqoid = seqinfo->localrelid; /* * Make sure that the sequence is copied as table owner, unless the user @@ -337,7 +322,8 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, int64 last_value, if (!MySubscription->runasowner) SwitchToUntrustedUser(seqinfo->seqowner, &ucxt); - aclresult = pg_class_aclcheck(seqinfo->localrelid, GetUserId(), ACL_UPDATE); + aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE); + if (aclresult != ACLCHECK_OK) { if (!run_as_owner) @@ -346,7 +332,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, int64 last_value, return COPYSEQ_INSUFFICIENT_PERM; } - SetSequence(seqinfo->localrelid, last_value, is_called); + SetSequence(seqoid, last_value, is_called); if (!run_as_owner) RestoreUserContext(&ucxt); @@ -356,7 +342,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, int64 last_value, * sequence as READY. The LSN represents the WAL position of the remote * sequence at the time it was synchronized. */ - UpdateSubscriptionRelState(MySubscription->oid, seqinfo->localrelid, + UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, page_lsn, false); return COPYSEQ_SUCCESS; @@ -366,65 +352,80 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, int64 last_value, * Copy existing data of sequences from the publisher. */ static void -copy_sequences(WalReceiverConn *conn, Oid subid) +copy_sequences(WalReceiverConn *conn, List *seqinfos) { - int total_seqs = hash_get_num_entries(sequences_to_copy); int current_index = 0; StringInfo mismatched_seqs = makeStringInfo(); StringInfo missing_seqs = makeStringInfo(); StringInfo insuffperm_seqs = makeStringInfo(); StringInfo seqstr = makeStringInfo(); StringInfo cmd = makeStringInfo(); - HASH_SEQ_STATUS status; - LogicalRepSequenceInfo *entry; #define MAX_SEQUENCES_SYNC_PER_BATCH 100 ereport(LOG, errmsg("logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d", - MySubscription->name, total_seqs)); + MySubscription->name, list_length(seqinfos))); - while (current_index < total_seqs) + while (current_index < list_length(seqinfos)) { - Oid seqRow[REMOTE_SEQ_COL_COUNT] = {TEXTOID, TEXTOID, INT8OID, + Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID, BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; int batch_size = 0; int batch_succeeded_count = 0; int batch_mismatched_count = 0; - int batch_skipped_count = 0; int batch_insuffperm_count = 0; WalRcvExecResult *res; TupleTableSlot *slot; + ListCell *lc; StartTransactionCommand(); - hash_seq_init(&status, sequences_to_copy); - /* Collect a batch of sequences */ - while ((entry = (LogicalRepSequenceInfo *) hash_seq_search(&status)) != NULL) + for_each_from(lc, seqinfos, current_index) { - if (entry->remote_seq_queried) + Relation sequence_rel; + MemoryContext oldctx; + LogicalRepSequenceInfo *seqinfo = (LogicalRepSequenceInfo *) lfirst(lc); + + sequence_rel = try_table_open(seqinfo->localrelid, RowExclusiveLock); + + /* Skip if sequence was dropped concurrently */ + if (!sequence_rel) + { + seqinfos = foreach_delete_current(seqinfos, lc); continue; + } + + /* Save sequence info */ + oldctx = MemoryContextSwitchTo(TopMemoryContext); + seqinfo->seqname = pstrdup(RelationGetRelationName(sequence_rel)); + seqinfo->nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); + seqinfo->seqowner = sequence_rel->rd_rel->relowner; + MemoryContextSwitchTo(oldctx); + + /* + * Hold the lock until transaction end to prevent concurrent + * sequence alter operation. + */ + table_close(sequence_rel, NoLock); if (seqstr->len > 0) appendStringInfoString(seqstr, ", "); - appendStringInfo(seqstr, "(\'%s\', \'%s\')", entry->nspname, entry->seqname); - entry->remote_seq_queried = true; + appendStringInfo(seqstr, "(\'%s\', \'%s\', %d)", + seqinfo->nspname, seqinfo->seqname, + foreach_current_index(lc)); - batch_size++; - if (batch_size == MAX_SEQUENCES_SYNC_PER_BATCH || - (current_index + batch_size == total_seqs)) + if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH) break; } - hash_seq_term(&status); - appendStringInfo(cmd, - "SELECT s.schname, s.seqname, ps.*, seq.seqtypid,\n" + "SELECT s.seqidx, ps.*, seq.seqtypid,\n" " seq.seqstart, seq.seqincrement, seq.seqmin,\n" " seq.seqmax, seq.seqcycle\n" - "FROM ( VALUES %s ) AS s (schname, seqname)\n" + "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n" "JOIN pg_namespace n ON n.nspname = s.schname\n" "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n" "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n" @@ -441,20 +442,18 @@ copy_sequences(WalReceiverConn *conn, Oid subid) slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - LogicalRepSequenceInfo *seqinfo; - LogicalRepSeqHashKey key; int64 last_value; bool is_called; XLogRecPtr page_lsn; + int seqidx; Oid remote_typid; int64 remote_start; int64 remote_increment; int64 remote_min; int64 remote_max; bool remote_cycle; - bool found; CopySeqResult result; - Relation sequence_rel; + LogicalRepSequenceInfo *seqinfo; CHECK_FOR_INTERRUPTS(); @@ -464,54 +463,42 @@ copy_sequences(WalReceiverConn *conn, Oid subid) ProcessConfigFile(PGC_SIGHUP); } - get_remote_sequence_info(slot, &key, &last_value, &is_called, + get_remote_sequence_info(slot, &seqidx, &last_value, &is_called, &page_lsn, &remote_typid, &remote_start, &remote_increment, &remote_min, &remote_max, &remote_cycle); - seqinfo = hash_search(sequences_to_copy, &key, HASH_FIND, &found); - Assert(seqinfo); + seqinfo = (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx); + seqinfo->found_on_pub = true; - /* Try to open sequence */ - sequence_rel = try_table_open(seqinfo->localrelid, RowExclusiveLock); - - result = validate_sequence(sequence_rel, seqinfo, remote_typid, + result = validate_sequence(seqinfo, remote_typid, remote_start, remote_increment, remote_min, remote_max, remote_cycle); + if (result == COPYSEQ_SUCCESS) - result = copy_sequence(seqinfo, last_value, is_called, page_lsn); + result = copy_sequence(seqinfo, last_value, is_called, + page_lsn); switch (result) { + case COPYSEQ_SUCCESS: + batch_succeeded_count++; + + elog(DEBUG1, "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished", + MySubscription->name, seqinfo->nspname, seqinfo->seqname); + + break; case COPYSEQ_MISMATCH: - append_sequence_name(mismatched_seqs, key.nspname, - key.seqname, &batch_mismatched_count); + append_sequence_name(mismatched_seqs, seqinfo->nspname, + seqinfo->seqname, + &batch_mismatched_count); break; case COPYSEQ_INSUFFICIENT_PERM: - append_sequence_name(insuffperm_seqs, key.nspname, - key.seqname, &batch_insuffperm_count); - break; - case COPYSEQ_SKIPPED: - ereport(LOG, - errmsg("skip synchronization of sequence \"%s.%s\" because it has been altered or dropped concurrently", - key.nspname, key.seqname)); - batch_skipped_count++; - break; - default: - ereport(DEBUG1, - errmsg_internal("logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished", - MySubscription->name, key.nspname, - key.seqname)); - batch_succeeded_count++; + append_sequence_name(insuffperm_seqs, seqinfo->nspname, + seqinfo->seqname, + &batch_insuffperm_count); break; } - - /* Remove processed sequence from the hash table. */ - if (!hash_search(sequences_to_copy, &key, HASH_REMOVE, NULL)) - elog(ERROR, "hash table corrupted"); - - if (sequence_rel) - table_close(sequence_rel, NoLock); } ExecDropSingleTupleTableSlot(slot); @@ -520,35 +507,26 @@ copy_sequences(WalReceiverConn *conn, Oid subid) resetStringInfo(cmd); ereport(LOG, - errmsg("logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d skipped, %d mismatched, %d insufficient permission, %d missing from publisher", + errmsg("logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher", MySubscription->name, (current_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, batch_size, - batch_succeeded_count, batch_skipped_count, batch_mismatched_count, batch_insuffperm_count, - batch_size - (batch_succeeded_count + batch_skipped_count + batch_mismatched_count + batch_insuffperm_count))); + batch_succeeded_count, batch_mismatched_count, batch_insuffperm_count, + batch_size - (batch_succeeded_count + batch_mismatched_count + batch_insuffperm_count))); /* Commit this batch, and prepare for next batch */ CommitTransactionCommand(); /* - * current_indexes is not incremented sequentially because some + * current_index is not incremented sequentially because some * sequences may be missing, and the number of fetched rows may not - * match the batch size. The `hash_search` with HASH_REMOVE takes care - * of the count. + * match the batch size. */ current_index += batch_size; } - /* - * Any sequences remaining in the hash table were not found on the - * publisher. This is because they were included in a query - * (remote_seq_queried) but were not returned in the result set. - */ - hash_seq_init(&status, sequences_to_copy); - while ((entry = (LogicalRepSequenceInfo *) hash_seq_search(&status)) != NULL) - { - Assert(entry->remote_seq_queried); - append_sequence_name(missing_seqs, entry->nspname, entry->seqname, - NULL); - } + foreach_ptr(LogicalRepSequenceInfo, seqinfo, seqinfos) + if (!seqinfo->found_on_pub) + append_sequence_name(missing_seqs, seqinfo->nspname, + seqinfo->seqname, NULL); /* Report permission issues, mismatches, or missing sequences */ if (insuffperm_seqs->len || mismatched_seqs->len || missing_seqs->len) @@ -559,68 +537,6 @@ copy_sequences(WalReceiverConn *conn, Oid subid) destroyStringInfo(insuffperm_seqs); } -/* - * Relcache invalidation callback - */ -static void -sequencesync_list_invalidate_cb(Datum arg, Oid reloid) -{ - HASH_SEQ_STATUS status; - LogicalRepSequenceInfo *entry; - - /* Quick exit if no sequence is listed yet */ - if (hash_get_num_entries(sequences_to_copy) == 0) - return; - - if (reloid != InvalidOid) - { - hash_seq_init(&status, sequences_to_copy); - - while ((entry = (LogicalRepSequenceInfo *) hash_seq_search(&status)) != NULL) - { - if (entry->localrelid == reloid) - { - entry->entry_valid = false; - hash_seq_term(&status); - break; - } - } - } - else - { - /* invalidate all entries */ - hash_seq_init(&status, sequences_to_copy); - while ((entry = (LogicalRepSequenceInfo *) hash_seq_search(&status)) != NULL) - entry->entry_valid = false; - } -} - -static uint32 -LogicalRepSeqHash(const void *key, Size keysize) -{ - const LogicalRepSeqHashKey *k = (const LogicalRepSeqHashKey *) key; - uint32 h1 = string_hash(k->nspname, strlen(k->nspname)); - uint32 h2 = string_hash(k->seqname, strlen(k->seqname)); - - return h1 ^ h2; -} - -static int -LogicalRepSeqMatchFunc(const void *key1, const void *key2, Size keysize) -{ - int cmp; - const LogicalRepSeqHashKey *k1 = (const LogicalRepSeqHashKey *) key1; - const LogicalRepSeqHashKey *k2 = (const LogicalRepSeqHashKey *) key2; - - /* Compare by namespace name first */ - cmp = strcmp(k1->nspname, k2->nspname); - if (cmp != 0) - return cmp; - - /* If namespace names are equal, compare by sequence name */ - return strcmp(k1->seqname, k2->seqname); -} - /* * Start syncing the sequences in the sequencesync worker. */ @@ -635,21 +551,7 @@ LogicalRepSyncSequences(void) SysScanDesc scan; Oid subid = MyLogicalRepWorker->subid; StringInfoData app_name; - HASHCTL ctl; - bool found; - LogicalRepSequenceInfo *seq_entry; - - ctl.keysize = sizeof(LogicalRepSeqHashKey); - ctl.entrysize = sizeof(LogicalRepSequenceInfo); - ctl.hcxt = CacheMemoryContext; - ctl.hash = LogicalRepSeqHash; - ctl.match = LogicalRepSeqMatchFunc; - sequences_to_copy = hash_create("Logical replication sequences", 256, &ctl, - HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); - - /* Watch for invalidation events. */ - CacheRegisterRelcacheCallback(sequencesync_list_invalidate_cb, - (Datum) 0); + List *seqinfos = NIL; StartTransactionCommand(); @@ -667,12 +569,12 @@ LogicalRepSyncSequences(void) scan = systable_beginscan(rel, InvalidOid, false, NULL, 2, skey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_subscription_rel subrel; char relkind; - Relation sequence_rel; - LogicalRepSeqHashKey key; + LogicalRepSequenceInfo *seq; MemoryContext oldctx; CHECK_FOR_INTERRUPTS(); @@ -684,32 +586,17 @@ LogicalRepSyncSequences(void) if (relkind != RELKIND_SEQUENCE) continue; - /* Skip if sequence was dropped concurrently */ - sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock); - if (!sequence_rel) - continue; - - key.seqname = RelationGetRelationName(sequence_rel); - key.nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - - seq_entry = hash_search(sequences_to_copy, &key, HASH_ENTER, &found); - Assert(!found); - - memset(seq_entry, 0, sizeof(LogicalRepSequenceInfo)); + /* + * Worker needs to process sequences across transaction boundary, so + * allocate them under long-lived context. + */ + oldctx = MemoryContextSwitchTo(TopMemoryContext); - seq_entry->seqname = pstrdup(key.seqname); - seq_entry->nspname = pstrdup(key.nspname); - seq_entry->localrelid = subrel->srrelid; - seq_entry->remote_seq_queried = false; - seq_entry->seqowner = sequence_rel->rd_rel->relowner; - seq_entry->entry_valid = true; + seq = palloc0_object(LogicalRepSequenceInfo); + seq->localrelid = subrel->srrelid; + seqinfos = lappend(seqinfos, seq); MemoryContextSwitchTo(oldctx); - - table_close(sequence_rel, RowExclusiveLock); } /* Cleanup */ @@ -718,6 +605,9 @@ LogicalRepSyncSequences(void) CommitTransactionCommand(); + if (!seqinfos) + return; + /* Is the use of a password mandatory? */ must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; @@ -741,9 +631,7 @@ LogicalRepSyncSequences(void) pfree(app_name.data); - /* If there are any sequences that need to be copied */ - if (hash_get_num_entries(sequences_to_copy)) - copy_sequences(LogRepWorkerWalRcvConn, subid); + copy_sequences(LogRepWorkerWalRcvConn, seqinfos); } /* diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index b42b05e6342..d06e28280cf 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -93,9 +93,8 @@ typedef struct LogicalRepSequenceInfo char *seqname; char *nspname; Oid localrelid; - bool remote_seq_queried; Oid seqowner; - bool entry_valid; + bool found_on_pub; } LogicalRepSequenceInfo; extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, -- 2.51.1.windows.1