From 07f9825641c6f64fb1fc475121df79ad361867be Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 4 Mar 2025 16:51:19 +0900 Subject: [PATCH v2 2/3] Introduce a new invalidation message to invalidate caches in output plugins --- src/backend/utils/cache/inval.c | 87 +++++++++++++++++++++++++++++++++ src/include/storage/sinval.h | 11 +++++ src/include/utils/inval.h | 10 ++++ 3 files changed, 108 insertions(+) diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 700ccb6df9..d793bc9281 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -271,6 +271,7 @@ int debug_discard_caches = 0; #define MAX_SYSCACHE_CALLBACKS 64 #define MAX_RELCACHE_CALLBACKS 10 +#define MAX_RELSYNC_CALLBACKS 10 static struct SYSCACHECALLBACK { @@ -292,6 +293,15 @@ static struct RELCACHECALLBACK static int relcache_callback_count = 0; +static struct RELSYNCCALLBACK +{ + RelSyncCallbackFunction function; + Datum arg; +} relsync_callback_list[MAX_RELSYNC_CALLBACKS]; + +static int relsync_callback_count = 0; + + /* ---------------------------------------------------------------- * Invalidation subgroup support functions * ---------------------------------------------------------------- @@ -832,6 +842,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) else if (msg->sn.dbId == MyDatabaseId) InvalidateCatalogSnapshot(); } + else if (msg->id == SHAREDINVALRELSYNC_ID) + { + /* We only care about our own database */ + if (msg->rs.dbId == MyDatabaseId) + CallRelSyncCallbacks(msg->rs.relid); + } else elog(FATAL, "unrecognized SI message ID: %d", msg->id); } @@ -1695,6 +1711,42 @@ CacheInvalidateRelmap(Oid databaseId) } +/* + * RelationCacheInvalidate + * Register invalidation of the cache in logical decoding output plugin + * for a database. + * + * This type of invalidation message is used for the specific purpose of output + * plugins. Processes which do not decode WALs would do nothing even when it + * receives the message. + */ +void +CacheInvalidateRelSync(Oid relid) +{ + SharedInvalidationMessage msg; + + msg.rs.id = SHAREDINVALRELSYNC_ID; + msg.rs.dbId = MyDatabaseId; + msg.rs.relid = relid; + /* check AddCatcacheInvalidationMessage() for an explanation */ + VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); + + SendSharedInvalidMessages(&msg, 1); +} + + +/* + * CacheInvalidateRelSyncAll + * Register invalidation of the whole cache in logical decoding output + * plugin. + */ +void +CacheInvalidateRelSyncAll(void) +{ + CacheInvalidateRelSync(InvalidOid); +} + + /* * CacheRegisterSyscacheCallback * Register the specified function to be called for all future @@ -1763,6 +1815,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, ++relcache_callback_count; } +/* + * CacheRegisterRelSyncCallback + * Register the specified function to be called for all future + * decoding-cache invalidation events. + * + * This function is intended to be call from the logical decoding output + * plugins. + */ +void +CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, + Datum arg) +{ + if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS) + elog(FATAL, "out of relsync_callback_list slots"); + + relsync_callback_list[relsync_callback_count].function = func; + relsync_callback_list[relsync_callback_count].arg = arg; + + ++relsync_callback_count; +} + /* * CallSyscacheCallbacks * @@ -1788,6 +1861,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue) } } +/* + * CallSyscacheCallbacks + */ +void +CallRelSyncCallbacks(Oid relid) +{ + for (int i = 0; i < relsync_callback_count; i++) + { + struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i; + + ccitem->function(ccitem->arg, relid); + } +} + /* * LogLogicalInvalidations * diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index 2463c0f9fa..90a5af4ed8 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -27,6 +27,7 @@ * * invalidate an smgr cache entry for a specific physical relation * * invalidate the mapped-relation mapping for a given database * * invalidate any saved snapshot that might be used to scan a given relation + * * invalidate a specific entry for specific output plugin * More types could be added if needed. The message type is identified by * the first "int8" field of the message struct. Zero or positive means a * specific-catcache inval message (and also serves as the catcache ID field). @@ -110,6 +111,15 @@ typedef struct Oid relId; /* relation ID */ } SharedInvalSnapshotMsg; +#define SHAREDINVALRELSYNC_ID (-6) + +typedef struct +{ + int8 id; /* type field --- must be first */ + Oid dbId; /* database ID */ + Oid relid; /* relation ID, or 0 if whole relcache */ +} SharedInvalRelSyncMsg; + typedef union { int8 id; /* type field --- must be first */ @@ -119,6 +129,7 @@ typedef union SharedInvalSmgrMsg sm; SharedInvalRelmapMsg rm; SharedInvalSnapshotMsg sn; + SharedInvalRelSyncMsg rs; } SharedInvalidationMessage; diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index 40658ba2ff..5922306c11 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches; typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue); typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid); +typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid); extern void AcceptInvalidationMessages(void); @@ -59,6 +60,10 @@ extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator); extern void CacheInvalidateRelmap(Oid databaseId); +extern void CacheInvalidateRelSync(Oid relid); + +extern void CacheInvalidateRelSyncAll(void); + extern void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg); @@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid, extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg); +extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, + Datum arg); + extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue); +extern void CallRelSyncCallbacks(Oid relid); + extern void InvalidateSystemCaches(void); extern void InvalidateSystemCachesExtended(bool debug_discard); -- 2.43.5