From 9cd724e329591e037e9563ae5d28b811b0be8f91 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Thu, 11 Sep 2025 13:27:24 +0900 Subject: [PATCH v20251031 2/2] WIP: convert the hash table into shared one This patch extends the parallel apply to use the shared hash table for the dependency tracking. Currently entries are added only by the leader apply, and parallel apply worker can remove entries at the commit. --- .../replication/logical/applyparallelworker.c | 35 ++ src/backend/replication/logical/worker.c | 444 ++++++++++-------- .../utils/activity/wait_event_names.txt | 1 + src/include/replication/worker_internal.h | 18 + src/include/storage/lwlocklist.h | 1 + src/test/subscription/t/001_rep_changes.pl | 1 + 6 files changed, 310 insertions(+), 190 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index dccd221ad01..bc57fa298a1 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -298,6 +298,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL; /* A list to maintain subtransactions, if any. */ static List *subxactlist = NIL; +/* A list of known replica identity keys */ +List *replica_identity_keys = NIL; + static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); static PartialFileSetState pa_get_fileset_state(void); @@ -428,6 +431,8 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) shared->parallel_apply_dsa_handle = parallel_apply_dsa_handle; shared->parallelized_txns_handle = parallelized_txns_handle; shared->has_dependent_txn = false; + shared->dependency_dsa_handle = DSA_HANDLE_INVALID; + shared->dependency_dshash_handle = DSHASH_HANDLE_INVALID; shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared); @@ -452,6 +457,10 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) winfo->dsm_seg = seg; winfo->shared = shared; + /* Set dependency hash table handler */ + atach_dependency_hash(&winfo->shared->dependency_dsa_handle, + &winfo->shared->dependency_dshash_handle); + return true; } @@ -1728,6 +1737,11 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED); /* + * XXX: no need to remove dependency hash entries because it is not + * used for streamed transactions + */ + + /* * Release the lock as we might be processing an empty streaming * transaction in which case the lock won't be released during * transaction rollback. @@ -2062,6 +2076,9 @@ pa_commit_transaction(void) TransactionId xid = MyParallelShared->xid; bool has_dependent_txn; + /* Remove the transaction from the dependency hash table */ + dependency_cleanup(); + SpinLockAcquire(&MyParallelShared->mutex); MyParallelShared->xact_state = PARALLEL_TRANS_FINISHED; has_dependent_txn = MyParallelShared->has_dependent_txn; @@ -2123,3 +2140,21 @@ write_internal_relation(StringInfo s, LogicalRepRelation *rel) logicalrep_write_all_rels(s); } } + +/* + * Remember the given replica identity key. + */ +void +remember_replica_identity_key(ReplicaIdentityKey *key) +{ + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + /* + * XXX Currently we do not take care the uniqueness. Same entry can be + * appended twice. + */ + replica_identity_keys = lappend(replica_identity_keys, key); + MemoryContextSwitchTo(oldctx); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8dd2e28522b..d62b783fc48 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -256,11 +256,13 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "common/hashfn.h" #include "commands/subscriptioncmds.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/execPartition.h" +#include "lib/dshash.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "optimizer/optimizer.h" @@ -546,46 +548,45 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; -typedef struct ReplicaIdentityKey -{ - Oid relid; - LogicalRepTupleData *data; -} ReplicaIdentityKey; - +/* + * dshash entry; Holds last remote_xid that modified the tuple on the publisher. + */ typedef struct ReplicaIdentityEntry { - ReplicaIdentityKey *keydata; + ReplicaIdentityKey keydata; TransactionId remote_xid; - - /* needed for simplehash */ - uint32 hash; - char status; } ReplicaIdentityEntry; -#include "common/hashfn.h" +/* + * Build a hash value from the oid and replica identity columns. + * + * XXX: do we have to extend hash value? + */ +static uint32 +build_hash(Oid relid, LogicalRepTupleData *data, LogicalRepRelMapEntry *relentry) +{ + int i; + uint32 hashkey = 0; + + hashkey = hash_combine(hashkey, hash_uint32(relid)); + + for (i = 0; i < data->ncols; i++) + { + uint32 hkey; + + if (data->colstatus[i] == LOGICALREP_COLUMN_NULL) + continue; + + if (!bms_is_member(i, relentry->remoterel.attkeys)) + continue; -static uint32 hash_replica_identity(ReplicaIdentityKey *key); -static bool hash_replica_identity_compare(ReplicaIdentityKey *a, - ReplicaIdentityKey *b); - -/* Define parameters for replica identity hash table code generation. */ -#define SH_PREFIX replica_identity -#define SH_ELEMENT_TYPE ReplicaIdentityEntry -#define SH_KEY_TYPE ReplicaIdentityKey * -#define SH_KEY keydata -#define SH_HASH_KEY(tb, key) hash_replica_identity(key) -#define SH_EQUAL(tb, a, b) hash_replica_identity_compare(a, b) -#define SH_STORE_HASH -#define SH_GET_HASH(tb, a) (a)->hash -#define SH_SCOPE static inline -#define SH_DECLARE -#define SH_DEFINE -#include "lib/simplehash.h" - -#define REPLICA_IDENTITY_INITIAL_SIZE 128 -#define REPLICA_IDENTITY_CLEANUP_THRESHOLD 1024 - -static replica_identity_hash *replica_identity_table = NULL; + hkey = hash_any((const unsigned char *) data->colvalues[i].data, + data->colvalues[i].len); + hashkey = hash_combine(hashkey, hkey); + } + + return hashkey; +} static void write_internal_dependencies(StringInfo s, List *depends_on_xids); @@ -669,135 +670,148 @@ static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); static void replorigin_reset(int code, Datum arg); +static void dependency_dsa_detach(int code, Datum arg); +static void ensure_dependency_dshash(void); -/* - * Compute the hash value for entries in the replica_identity_table. - */ -static uint32 -hash_replica_identity(ReplicaIdentityKey *key) +/* parameters for the RI dependency shared hash table */ +static const dshash_parameters dependency_dsh_params = { - int i; - uint32 hashkey = 0; - - hashkey = hash_combine(hashkey, hash_uint32(key->relid)); - - for (i = 0; i < key->data->ncols; i++) - { - uint32 hkey; - - if (key->data->colstatus[i] == LOGICALREP_COLUMN_NULL) - continue; - - hkey = hash_any((const unsigned char *) key->data->colvalues[i].data, - key->data->colvalues[i].len); - hashkey = hash_combine(hashkey, hkey); - } + sizeof(ReplicaIdentityKey), + sizeof(ReplicaIdentityEntry), + dshash_memcmp, + dshash_memhash, + dshash_memcpy, + LWTRANCHE_DEPENDENCY_APPLY_DSA +}; - return hashkey; -} +static dsa_area *dependency_dsa_area = NULL; +static dshash_table *dependency_dshash = NULL; /* - * Compare two entries in the replica_identity_table. + * Allocate dependency hash table on the shared memory, or attach to it. + * + * It is always called by a leader apply worker first, then called by parallel + * workers. */ -static bool -hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) +static void +ensure_dependency_dshash(void) { - if (a->relid != b->relid || - a->data->ncols != b->data->ncols) - return false; + MemoryContext oldcontext; - for (int i = 0; i < a->data->ncols; i++) - { - if (a->data->colstatus[i] != b->data->colstatus[i]) - return false; + /* Already initialized */ + if (dependency_dshash) + return; - if (a->data->colvalues[i].len != b->data->colvalues[i].len) - return false; + /* XXX: is it OK to swtich the context only at the */ + oldcontext = MemoryContextSwitchTo(ApplyContext); - if (strcmp(a->data->colvalues[i].data, b->data->colvalues[i].data)) - return false; + /* Parallel apply worker should attach to existing dsa and dshash */ + if (am_parallel_apply_worker()) + { + Assert(MyParallelShared->dependency_dsa_handle != DSA_HANDLE_INVALID && + MyParallelShared->dependency_dshash_handle != DSHASH_HANDLE_INVALID); - elog(DEBUG1, "conflicting key %s", a->data->colvalues[i].data); + dependency_dsa_area = dsa_attach(MyParallelShared->dependency_dsa_handle); + dsa_pin_mapping(dependency_dsa_area); + dependency_dshash = dshash_attach(dependency_dsa_area, &dependency_dsh_params, + MyParallelShared->dependency_dshash_handle, NULL); + } + else + { + /* Leader apply worker should create dsa and dshash */ + dependency_dsa_area = dsa_create(LWTRANCHE_DEPENDENCY_APPLY_DSA); + dsa_pin(dependency_dsa_area); + dsa_pin_mapping(dependency_dsa_area); + dependency_dshash = dshash_create(dependency_dsa_area, &dependency_dsh_params, NULL); } - return true; + before_shmem_exit(dependency_dsa_detach, (Datum) 0); + MemoryContextSwitchTo(oldcontext); } /* - * Free resources associated with a replica identity key. + * Attach to the shared hash table for dependency tracking. */ -static void -free_replica_identity_key(ReplicaIdentityKey *key) +void +atach_dependency_hash(dsa_handle *out_dsa, dshash_table_handle *out_hash) { - Assert(key); + Assert(dependency_dsa_area && dependency_dshash); - pfree(key->data->colvalues); - pfree(key->data->colstatus); - pfree(key->data); - pfree(key); + *out_dsa = dsa_get_handle(dependency_dsa_area); + *out_hash = dshash_get_hash_table_handle(dependency_dshash); } /* * Clean up hash table entries associated with the given transaction IDs. + * + * XXX: do we have to retain this? Or it is enough done by parallel workers? */ static void cleanup_replica_identity_table(List *committed_xid) { - replica_identity_iterator i; + dshash_seq_status hstat; ReplicaIdentityEntry *rientry; + if (!dependency_dshash) + return; + if (!committed_xid) return; - replica_identity_start_iterate(replica_identity_table, &i); - while ((rientry = replica_identity_iterate(replica_identity_table, &i)) != NULL) + dshash_seq_init(&hstat, dependency_dshash, true); + while ((rientry = dshash_seq_next(&hstat)) != NULL) { if (!list_member_xid(committed_xid, rientry->remote_xid)) continue; /* Clean up the hash entry for committed transaction */ - free_replica_identity_key(rientry->keydata); - replica_identity_delete_item(replica_identity_table, rientry); + dshash_delete_current(&hstat); } + dshash_seq_term(&hstat); } /* - * Check committed transactions and clean up corresponding entries in the hash - * table. + * Remove all hash table entries handled by the worker. + * + * This is called when a transaction is committed by parallel apply workers. */ -static void -cleanup_committed_replica_identity_entries(void) +void +dependency_cleanup(void) { - dlist_mutable_iter iter; - List *committed_xids = NIL; - - dlist_foreach_modify(iter, &lsn_mapping) - { - FlushPosition *pos = - dlist_container(FlushPosition, node, iter.cur); - bool skipped_write; + Assert(am_parallel_apply_worker()); - if (!TransactionIdIsValid(pos->pa_remote_xid) || - !XLogRecPtrIsInvalid(pos->local_end)) - continue; - - pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true, - &skipped_write); + /* + * Quick exit if dependency hash is not attached yet. + * + * XXX is it possible? + */ + if (!dependency_dshash) + return; - elog(DEBUG1, - "got commit end from parallel apply worker, " - "txn: %u, remote_end %X/%X, local_end %X/%X", - pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end), - LSN_FORMAT_ARGS(pos->local_end)); + /* + * Quick exit if no replication identity key is remembered. + * + * XXX is it possible? + */ + if (replica_identity_keys == NIL) + return; - if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end)) - continue; + /* Walk through the list to remove all related entries */ + foreach_ptr(ReplicaIdentityKey, rikey, replica_identity_keys) + { + /* + * XXX We cannot ensure that dshash entries were deleted. There is a + * case that same key was appended several times and no entry could be + * removed by the second try. + */ + dshash_delete_key(dependency_dshash, rikey); - committed_xids = lappend_xid(committed_xids, pos->pa_remote_xid); + replica_identity_keys = foreach_delete_current(replica_identity_keys, + rikey); } - /* cleanup the entries for committed transactions */ - cleanup_replica_identity_table(committed_xids); + list_free(replica_identity_keys); + replica_identity_keys = NIL; } /* @@ -833,39 +847,22 @@ check_and_append_xid_dependency(List *depends_on_xids, } /* - * Check for dependencies on preceding transactions that modify the same key. - * Returns the dependent transactions in 'depends_on_xids' and records the - * current change. + * Compute a hash key for the dependency hash table. + * + * Returns NULL in case of below, otherwise palloc'd ReplicaIdentityKey: + * - There are no replica identity columns + * - RI key is NULL or is explicitly marked unchanged */ -static void -check_dependency_on_replica_identity(Oid relid, - LogicalRepTupleData *original_data, - TransactionId new_depended_xid, - List **depends_on_xids) +static ReplicaIdentityKey * +compute_replca_identity_key(LogicalRepRelMapEntry *relentry, + LogicalRepTupleData *original_data) { - LogicalRepRelMapEntry *relentry; - LogicalRepTupleData *ridata; - ReplicaIdentityKey *rikey; - ReplicaIdentityEntry *rientry; - MemoryContext oldctx; int n_ri; - bool found = false; - - Assert(depends_on_xids); - - /* Search for existing entry */ - relentry = logicalrep_get_relentry(relid); + MemoryContext oldctx; + ReplicaIdentityKey *rikey; Assert(relentry); - /* - * First search whether any previous transaction has affected the whole - * table e.g., truncate or schema change from publisher. - */ - *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, - &relentry->last_depended_xid, - new_depended_xid); - n_ri = bms_num_members(relentry->remoterel.attkeys); /* @@ -874,7 +871,7 @@ check_dependency_on_replica_identity(Oid relid, * replica identity full. */ if (!n_ri) - return; + return NULL; /* Check if the RI key value of the tuple is invalid */ for (int i = 0; i < original_data->ncols; i++) @@ -889,64 +886,85 @@ check_dependency_on_replica_identity(Oid relid, */ if (original_data->colstatus[i] == LOGICALREP_COLUMN_NULL || original_data->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED) - return; + return NULL; } oldctx = MemoryContextSwitchTo(ApplyContext); - /* Allocate space for replica identity values */ - ridata = palloc0_object(LogicalRepTupleData); - ridata->colvalues = palloc0_array(StringInfoData, n_ri); - ridata->colstatus = palloc0_array(char, n_ri); - ridata->ncols = n_ri; + rikey = palloc_object(ReplicaIdentityKey); + rikey->relid = relentry->remoterel.remoteid; + rikey->hash = build_hash(relentry->remoterel.remoteid, original_data, + relentry); - for (int i_original = 0, i_ri = 0; i_original < original_data->ncols; i_original++) - { - StringInfo original_colvalue = &original_data->colvalues[i_original]; + MemoryContextSwitchTo(oldctx); - if (!bms_is_member(i_original, relentry->remoterel.attkeys)) - continue; + return rikey; +} - initStringInfoExt(&ridata->colvalues[i_ri], original_colvalue->len + 1); - appendStringInfoString(&ridata->colvalues[i_ri], original_colvalue->data); - ridata->colstatus[i_ri] = original_data->colstatus[i_original]; - i_ri++; - } +/* + * Check for dependencies on preceding transactions that modify the same key. + * Returns the dependent transactions in 'depends_on_xids' and records the + * current change. + */ +static void +check_dependency_on_replica_identity(Oid relid, + LogicalRepTupleData *original_data, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + ReplicaIdentityKey *rikey; + ReplicaIdentityEntry *rientry; + MemoryContext oldctx; + bool found = false; + + Assert(depends_on_xids); + + /* Search for existing entry */ + relentry = logicalrep_get_relentry(relid); - rikey = palloc0_object(ReplicaIdentityKey); - rikey->relid = relid; - rikey->data = ridata; + Assert(relentry); + + /* + * First search whether any previous transaction has affected the whole table + * e.g., truncate or schema change from publisher. + */ + *depends_on_xids = check_and_append_xid_dependency(*depends_on_xids, + &relentry->last_depended_xid, + new_depended_xid); + + /* Compute the hash key */ + rikey = compute_replca_identity_key(relentry, original_data); + + if (!rikey) + return; + + oldctx = MemoryContextSwitchTo(ApplyContext); if (TransactionIdIsValid(new_depended_xid)) { - rientry = replica_identity_insert(replica_identity_table, rikey, - &found); + rientry = dshash_find_or_insert(dependency_dshash, rikey, &found); - /* - * Release the key built to search the entry, if the entry already - * exists. Otherwise, initialize the remote_xid. - */ + /* Reuse the existing entry if found */ if (found) { elog(DEBUG1, "found conflicting replica identity change from %u", rientry->remote_xid); - - free_replica_identity_key(rikey); } else rientry->remote_xid = InvalidTransactionId; } else - { - rientry = replica_identity_lookup(replica_identity_table, rikey); - free_replica_identity_key(rikey); - } + rientry = dshash_find(dependency_dshash, rikey, true); MemoryContextSwitchTo(oldctx); /* Return if no entry found */ if (!rientry) + { + pfree(rikey); return; + } Assert(!found || TransactionIdIsValid(rientry->remote_xid)); @@ -969,9 +987,14 @@ check_dependency_on_replica_identity(Oid relid, */ else if (!TransactionIdIsValid(rientry->remote_xid)) { - free_replica_identity_key(rientry->keydata); - replica_identity_delete_item(replica_identity_table, rientry); + dshash_delete_entry(dependency_dshash, rientry); + rientry = NULL; } + + pfree(rikey); + + if (rientry) + dshash_release_lock(dependency_dshash, rientry); } /* @@ -982,25 +1005,30 @@ static void find_all_dependencies_on_rel(LogicalRepRelId relid, TransactionId new_depended_xid, List **depends_on_xids) { - replica_identity_iterator i; + dshash_seq_status hstat; ReplicaIdentityEntry *rientry; Assert(depends_on_xids); - replica_identity_start_iterate(replica_identity_table, &i); - while ((rientry = replica_identity_iterate(replica_identity_table, &i)) != NULL) + /* + * XXX We ensure dependency hash table exists even here, because relation + * message seems to be able to reach before the BEGIN/STREAM_START + * message. + */ + ensure_dependency_dshash(); + + dshash_seq_init(&hstat, dependency_dshash, true); + while ((rientry = dshash_seq_next(&hstat)) != NULL) { Assert(TransactionIdIsValid(rientry->remote_xid)); - if (rientry->keydata->relid != relid) + if (rientry->keydata.relid != relid) continue; /* Clean up the hash entry for committed transaction while on it */ if (pa_transaction_committed(rientry->remote_xid)) { - free_replica_identity_key(rientry->keydata); - replica_identity_delete_item(replica_identity_table, rientry); - + dshash_delete_current(&hstat); continue; } @@ -1008,6 +1036,7 @@ find_all_dependencies_on_rel(LogicalRepRelId relid, TransactionId new_depended_x &rientry->remote_xid, new_depended_xid); } + dshash_seq_term(&hstat); } /* @@ -1082,14 +1111,6 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, if (!am_leader_apply_worker()) return; - if (!replica_identity_table) - replica_identity_table = replica_identity_create(ApplyContext, - REPLICA_IDENTITY_INITIAL_SIZE, - NULL); - - if (replica_identity_table->members >= REPLICA_IDENTITY_CLEANUP_THRESHOLD) - cleanup_committed_replica_identity_entries(); - switch (action) { case LOGICAL_REP_MSG_INSERT: @@ -1870,6 +1891,8 @@ apply_handle_begin(StringInfo s) maybe_start_skipping_changes(begin_data.final_lsn); + ensure_dependency_dshash(); + pa_allocate_worker(remote_xid, false); apply_action = get_transaction_apply_action(remote_xid, &winfo); @@ -2488,7 +2511,10 @@ apply_handle_stream_start(StringInfo s) /* Try to allocate a worker for the streaming transaction. */ if (first_segment) + { + ensure_dependency_dshash(); pa_allocate_worker(stream_xid, true); + } apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -3381,6 +3407,7 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; bool run_as_owner; + ReplicaIdentityKey *rikey; /* * Quick return if we are skipping data modification changes or handling @@ -3405,6 +3432,10 @@ apply_handle_insert(StringInfo s) return; } + rikey = compute_replca_identity_key(rel, &newtup); + if (rikey) + remember_replica_identity_key(rikey); + /* * Make sure that any user-supplied code runs as the table owner, unless * the user has opted out of that behavior. @@ -3541,6 +3572,7 @@ apply_handle_update(StringInfo s) RTEPermissionInfo *target_perminfo; MemoryContext oldctx; bool run_as_owner; + ReplicaIdentityKey *rikey; /* * Quick return if we are skipping data modification changes or handling @@ -3566,6 +3598,17 @@ apply_handle_update(StringInfo s) return; } + if (has_oldtup) + { + rikey = compute_replca_identity_key(rel, &oldtup); + if (rikey) + remember_replica_identity_key(rikey); + } + + rikey = compute_replca_identity_key(rel, &newtup); + if (rikey) + remember_replica_identity_key(rikey); + /* Set relation for error callback */ apply_error_callback_arg.rel = rel; @@ -3760,6 +3803,7 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; bool run_as_owner; + ReplicaIdentityKey *rikey; /* * Quick return if we are skipping data modification changes or handling @@ -3784,6 +3828,10 @@ apply_handle_delete(StringInfo s) return; } + rikey = compute_replca_identity_key(rel, &oldtup); + if (rikey) + remember_replica_identity_key(rikey); + /* Set relation for error callback */ apply_error_callback_arg.rel = rel; @@ -6621,6 +6669,22 @@ InitializeLogRepWorker(void) CommitTransactionCommand(); } +/* + * Detach from dependency hash table + */ +static void +dependency_dsa_detach(int code, Datum arg) +{ + if (dependency_dshash) + { + /* XXX: do we have to detach or destory? */ + dshash_detach(dependency_dshash); + } + + if (dependency_dsa_area) + dsa_detach(dependency_dsa_area); +} + /* * Reset the origin state. */ diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 9c3737693ba..0755cac073c 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -402,6 +402,7 @@ XactSLRU "Waiting to access the transaction status SLRU cache." ParallelVacuumDSA "Waiting for parallel vacuum dynamic shared memory allocation." AioUringCompletion "Waiting for another process to complete IO via io_uring." ParallelApplyDSA "Waiting for parallel apply dynamic shared memory allocation." +DependencyApplyDSA "Waiting for worker dynamic shared memory allocation." # No "ABI_compatibility" region here as WaitEventLWLock has its own C code. diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index c70fae9efda..616e0ba8bec 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -25,6 +25,7 @@ #include "storage/shm_mq.h" #include "storage/shm_toc.h" #include "storage/spin.h" +#include "utils/dsa.h" /* Different types of worker */ typedef enum LogicalRepWorkerType @@ -200,6 +201,10 @@ typedef struct ParallelApplyWorkerShared dshash_table_handle parallelized_txns_handle; bool has_dependent_txn; + + /* Dependency hash table handler */ + dsa_handle dependency_dsa_handle; + dshash_table_handle dependency_dshash_handle; } ParallelApplyWorkerShared; /* @@ -239,6 +244,13 @@ typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerShared *shared; } ParallelApplyWorkerInfo; +/* dshash key; hash is computed from relid and replica identity columns */ +typedef struct ReplicaIdentityKey +{ + Oid relid; + uint32 hash; +} ReplicaIdentityKey; + /* Main memory context for apply worker. Permanent during worker lifetime. */ extern PGDLLIMPORT MemoryContext ApplyContext; @@ -261,6 +273,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern PGDLLEXPORT List *replica_identity_keys; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, @@ -371,6 +385,10 @@ extern void pa_record_dependency_on_transactions(List *depends_on_xids); extern void pa_commit_transaction(void); extern void pa_wait_for_depended_transaction(TransactionId xid); +extern void atach_dependency_hash(dsa_handle *out_dsa, dshash_table_handle *out_hash); +extern void dependency_cleanup(void); +extern void remember_replica_identity_key(ReplicaIdentityKey *key); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTablesyncWorker(worker) ((worker)->in_use && \ diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index f461bd67827..8362fbf0b9d 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -136,3 +136,4 @@ PG_LWLOCKTRANCHE(XACT_SLRU, XactSLRU) PG_LWLOCKTRANCHE(PARALLEL_VACUUM_DSA, ParallelVacuumDSA) PG_LWLOCKTRANCHE(AIO_URING_COMPLETION, AioUringCompletion) PG_LWLOCKTRANCHE(PARALLEL_APPLY_DSA, ParallelApplyDSA) +PG_LWLOCKTRANCHE(DEPENDENCY_APPLY_DSA, DependencyApplyDSA) diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 430c1246d14..09f670a6785 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -16,6 +16,7 @@ $node_publisher->start; # Create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', 'max_logical_replication_workers = 10'); $node_subscriber->start; # Create some preexisting content on publisher -- 2.47.3