diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 80540c017bd..2dc9c6bd91b 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -185,6 +185,8 @@ typedef struct RelationSyncEntry * row filter expressions, column list, etc. */ MemoryContext entry_cxt; + + dlist_node node; /* linked list pointers */ } RelationSyncEntry; /* @@ -218,11 +220,17 @@ typedef struct PGOutputTxnData /* Map used to remember which relation schemas we sent. */ static HTAB *RelationSyncCache = NULL; +/* least recently used entry list for RelationSyncCache. */ +static dclist_head rel_sync_cache_lru_list = DCLIST_STATIC_INIT(rel_sync_cache_lru_list); +/* The maximum number of entries in the RelationSyncCache. */ +int max_rel_sync_cache_size = 1024; static void init_rel_sync_cache(MemoryContext cachectx); -static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); +static void cleanup_streamed_txn(TransactionId xid, bool is_commit); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation relation); +static void cleanup_rel_sync_entry(RelationSyncEntry *entry); +static void maybe_cleanup_rel_sync_cache(void); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry); @@ -1892,7 +1900,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); - cleanup_rel_sync_cache(toptxn->xid, false); + cleanup_streamed_txn(toptxn->xid, false); } /* @@ -1919,7 +1927,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); - cleanup_rel_sync_cache(txn->xid, true); + cleanup_streamed_txn(txn->xid, true); } /* @@ -1968,6 +1976,8 @@ init_rel_sync_cache(MemoryContext cachectx) Assert(RelationSyncCache != NULL); + dclist_init(&rel_sync_cache_lru_list); + /* No more to do if we already registered callbacks */ if (relation_callbacks_registered) return; @@ -2059,6 +2069,22 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->publish_as_relid = InvalidOid; entry->columns = NULL; entry->attrmap = NULL; + + /* + * Since this is the most recently used entry, push this entry onto the + * end of the LRU list and check if there are any entries that need to + * be eliminated. + */ + dclist_push_tail(&rel_sync_cache_lru_list, &entry->node); + maybe_cleanup_rel_sync_cache(); + } + else + { + /* + * Move existing entry to the tail of the LRU list to mark it as the + * most recently used item. + */ + dclist_move_tail(&rel_sync_cache_lru_list, &entry->node); } /* Validate the entry */ @@ -2091,71 +2117,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) publications_valid = true; } - /* - * Reset schema_sent status as the relation definition may have - * changed. Also reset pubactions to empty in case rel was dropped - * from a publication. Also free any objects that depended on the - * earlier definition. - */ - entry->schema_sent = false; - entry->include_gencols_type = PUBLISH_GENCOLS_NONE; - list_free(entry->streamed_txns); - entry->streamed_txns = NIL; - bms_free(entry->columns); - entry->columns = NULL; - entry->pubactions.pubinsert = false; - entry->pubactions.pubupdate = false; - entry->pubactions.pubdelete = false; - entry->pubactions.pubtruncate = false; - - /* - * Tuple slots cleanups. (Will be rebuilt later if needed). - */ - if (entry->old_slot) - { - TupleDesc desc = entry->old_slot->tts_tupleDescriptor; - - Assert(desc->tdrefcount == -1); - - ExecDropSingleTupleTableSlot(entry->old_slot); - - /* - * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so - * do it now to avoid any leaks. - */ - FreeTupleDesc(desc); - } - if (entry->new_slot) - { - TupleDesc desc = entry->new_slot->tts_tupleDescriptor; - - Assert(desc->tdrefcount == -1); - - ExecDropSingleTupleTableSlot(entry->new_slot); - - /* - * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so - * do it now to avoid any leaks. - */ - FreeTupleDesc(desc); - } - - entry->old_slot = NULL; - entry->new_slot = NULL; - - if (entry->attrmap) - free_attrmap(entry->attrmap); - entry->attrmap = NULL; - - /* - * Row filter cache cleanups. - */ - if (entry->entry_cxt) - MemoryContextDelete(entry->entry_cxt); - - entry->entry_cxt = NULL; - entry->estate = NULL; - memset(entry->exprstate, 0, sizeof(entry->exprstate)); + /* Cleanup existing data */ + cleanup_rel_sync_entry(entry); /* * Build publication cache. We can't use one provided by relcache as @@ -2311,6 +2274,106 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) return entry; } +/* + * Cleanup attributes in the given entry to reuse it. + */ +static void +cleanup_rel_sync_entry(RelationSyncEntry *entry) +{ + /* + * Reset schema_sent status as the relation definition may have changed. + * Also reset pubactions to empty in case rel was dropped from a + * publication. Also free any objects that depended on the earlier + * definition. + */ + entry->schema_sent = false; + entry->include_gencols_type = PUBLISH_GENCOLS_NONE; + list_free(entry->streamed_txns); + entry->streamed_txns = NIL; + bms_free(entry->columns); + entry->columns = NULL; + entry->pubactions.pubinsert = false; + entry->pubactions.pubupdate = false; + entry->pubactions.pubdelete = false; + entry->pubactions.pubtruncate = false; + + /* + * Tuple slots cleanups. (Will be rebuilt later if needed). + */ + if (entry->old_slot) + { + TupleDesc desc = entry->old_slot->tts_tupleDescriptor; + + Assert(desc->tdrefcount == -1); + + ExecDropSingleTupleTableSlot(entry->old_slot); + + /* + * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do + * it now to avoid any leaks. + */ + FreeTupleDesc(desc); + } + if (entry->new_slot) + { + TupleDesc desc = entry->new_slot->tts_tupleDescriptor; + + Assert(desc->tdrefcount == -1); + + ExecDropSingleTupleTableSlot(entry->new_slot); + + /* + * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do + * it now to avoid any leaks. + */ + FreeTupleDesc(desc); + } + + entry->old_slot = NULL; + entry->new_slot = NULL; + + if (entry->attrmap) + free_attrmap(entry->attrmap); + entry->attrmap = NULL; + + /* + * Row filter cache cleanups. + */ + if (entry->entry_cxt) + MemoryContextDelete(entry->entry_cxt); + + entry->entry_cxt = NULL; + entry->estate = NULL; + memset(entry->exprstate, 0, sizeof(entry->exprstate)); +} + +/* + * Try to cleanup the Least Recently Used entry in the RelationSyncCache. + */ +static void +maybe_cleanup_rel_sync_cache(void) +{ + Assert(RelationSyncCache); + + if (dclist_count(&rel_sync_cache_lru_list) > max_rel_sync_cache_size) + { + RelationSyncEntry *entry; + + entry = dclist_head_element(RelationSyncEntry, node, + &rel_sync_cache_lru_list); + + dclist_delete_from(&rel_sync_cache_lru_list, &entry->node); + cleanup_rel_sync_entry(entry); + + /* Remove the etnry from the cache */ + if (hash_search(RelationSyncCache, + &entry->relid, + HASH_REMOVE, + NULL) == NULL) + elog(ERROR, "hash table corrupted"); + } +} + /* * Cleanup list of streamed transactions and update the schema_sent flag. * @@ -2323,7 +2386,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) * cache - so tweak the schema_sent flag accordingly. */ static void -cleanup_rel_sync_cache(TransactionId xid, bool is_commit) +cleanup_streamed_txn(TransactionId xid, bool is_commit) { HASH_SEQ_STATUS hash_seq; RelationSyncEntry *entry;