From 2bee57713c29b289f0cc3070cafdf45177f055dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro=20Herrera?= Date: Tue, 7 Apr 2026 22:29:40 +0200 Subject: [PATCH 2/2] Publish list of tables being repacked in shared memory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use it in autovacuum to skip processing tables that are being repacked. This is mostly to avoid repeated attempts to process such tables, which would fail due to the special deadlock checker behavior for repack. Author: Álvaro Herrera Discussion: https://postgr.es/m/202604072027.gazfa2zt2l2j@alvherre.pgsql --- src/backend/commands/repack.c | 195 ++++++++++++++++-- src/backend/postmaster/autovacuum.c | 20 ++ .../utils/activity/wait_event_names.txt | 1 + src/include/commands/repack.h | 2 + src/include/storage/lwlocklist.h | 2 +- src/include/storage/subsystemlist.h | 1 + src/tools/pgindent/typedefs.list | 3 + 7 files changed, 210 insertions(+), 14 deletions(-) diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c index a5f5df77291..ee7072dce6a 100644 --- a/src/backend/commands/repack.c +++ b/src/backend/commands/repack.c @@ -63,9 +63,11 @@ #include "optimizer/optimizer.h" #include "pgstat.h" #include "storage/bufmgr.h" +#include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/predicate.h" #include "storage/proc.h" +#include "storage/subsystems.h" #include "utils/acl.h" #include "utils/fmgroids.h" #include "utils/guc.h" @@ -79,6 +81,32 @@ #include "utils/syscache.h" #include "utils/wait_event_types.h" + +/* Shared memory layout for REPACK */ +typedef struct RepackWorkerInfo +{ + bool ri_in_use; + pid_t ri_backendpid; + Oid ri_dbid; + Oid ri_relid; + Oid ri_toastrelid; +} RepackWorkerInfo; + +typedef struct +{ + bool re_useless; + RepackWorkerInfo re_workerinfo[FLEXIBLE_ARRAY_MEMBER]; +} RepackShmemStruct; + +static RepackShmemStruct *RepackShmem; + +typedef struct RepackCleanupContext +{ + bool concurrent; + int workerindex; +} RepackCleanupContext; + + /* * This struct is used to pass around the information on tables to be * clustered. We need this so we can make a list of them when invoked without @@ -90,6 +118,7 @@ typedef struct Oid indexOid; } RelToCluster; + /* * The first file exported by the decoding worker must contain a snapshot, the * following ones contain the data changes. @@ -166,6 +195,10 @@ static List *get_tables_to_repack_partitioned(RepackCommand cmd, MemoryContext permcxt); static bool repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid); +static void RepackCleanup(RepackCleanupContext *context); +static void RepackCleanupCb(int code, Datum arg); +static void RepackShmemRequest(void *arg); +static void RepackShmemInit(void *arg); static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt); static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot, @@ -210,6 +243,11 @@ static void ProcessRepackMessage(StringInfo msg); static const char *RepackCommandAsString(RepackCommand cmd); +const ShmemCallbacks RepackShmemCallbacks = { + .request_fn = RepackShmemRequest, + .init_fn = RepackShmemInit, +}; + /* * The repack code allows for processing multiple tables at once. Because * of this, we cannot just run everything on a single transaction, or we @@ -514,6 +552,7 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, Oid tableOid = RelationGetRelid(OldHeap); Relation index; LOCKMODE lmode; + RepackCleanupContext context; Oid save_userid; int save_sec_context; int save_nestlevel; @@ -660,24 +699,43 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, TransferPredicateLocksToHeapRelation(OldHeap); /* rebuild_relation does all the dirty work */ - PG_TRY(); - { - rebuild_relation(OldHeap, index, verbose, ident_idx); - } - PG_FINALLY(); + context.concurrent = concurrent; + + PG_ENSURE_ERROR_CLEANUP(RepackCleanupCb, PointerGetDatum(&context)); { if (concurrent) { - /* - * Since during normal operation the worker was already asked to - * exit, stopping it explicitly is especially important on ERROR. - * However it still seems a good practice to make sure that the - * worker never survives the REPACK command. - */ - stop_repack_decoding_worker(); + bool freefound = false; + + LWLockAcquire(RepackLock, LW_EXCLUSIVE); + for (int i = 0; i < max_repack_replication_slots; i++) + { + RepackWorkerInfo *worker; + + if (RepackShmem->re_workerinfo[i].ri_in_use) + continue; + + freefound = true; + worker = &RepackShmem->re_workerinfo[i]; + context.workerindex = i; + + worker->ri_in_use = true; + worker->ri_backendpid = MyProcPid; + worker->ri_dbid = MyDatabaseId; + worker->ri_relid = RelationGetRelid(OldHeap); + worker->ri_toastrelid = OldHeap->rd_rel->reltoastrelid; + break; + } + if (!freefound) + elog(ERROR, "could not find free repack entry"); + LWLockRelease(RepackLock); } + + rebuild_relation(OldHeap, index, verbose, ident_idx); } - PG_END_TRY(); + PG_END_ENSURE_ERROR_CLEANUP(RepackCleanupCb, PointerGetDatum(&context)); + + RepackCleanup(&context); /* rebuild_relation closes OldHeap, and index if valid */ @@ -691,6 +749,117 @@ out: pgstat_progress_end_command(); } +/* + * Return whether any backend is running concurrent REPACK on the given table + * (which could be a toast table). + */ +bool +is_table_under_repack(Oid databaseId, Oid relid) +{ + bool retval = false; + + LWLockAcquire(RepackLock, LW_SHARED); + for (int i = 0; i < max_repack_replication_slots; i++) + { + RepackWorkerInfo *rworker; + + if (!RepackShmem->re_workerinfo[i].ri_in_use) + continue; + + rworker = &RepackShmem->re_workerinfo[i]; + if (rworker->ri_dbid == MyDatabaseId && + (rworker->ri_relid == relid || + rworker->ri_toastrelid == relid)) + retval = true; + } + LWLockRelease(RepackLock); + + return retval; +} + +/* + * Remove ourselves from the workerinfo array. + */ +static void +RepackCleanup(RepackCleanupContext *context) +{ + if (context->concurrent) + { + RepackWorkerInfo *worker; + + /* + * The worker would normally terminate on its own when the work is + * done, but make sure we signal it just in case. + */ + stop_repack_decoding_worker(); + + /* + * also, make sure we stop advertising the relation we were repacking, + * so that autovacuum reverts to handling it normally. + */ + LWLockAcquire(RepackLock, LW_EXCLUSIVE); + + worker = &RepackShmem->re_workerinfo[context->workerindex]; + Assert(worker->ri_backendpid == MyProcPid); + worker->ri_in_use = false; + worker->ri_backendpid = 0; + worker->ri_dbid = InvalidOid; + worker->ri_relid = InvalidOid; + worker->ri_toastrelid = InvalidOid; + LWLockRelease(RepackLock); + } +} + +/* + * RepackCleanup wrapped as an on_shmem_exit callback function + */ +static void +RepackCleanupCb(int code, Datum arg) +{ + RepackCleanup((RepackCleanupContext *) DatumGetPointer(arg)); +} + +/* + * RepackShmemRequest + * Register shared memory space needed for repack + */ +static void +RepackShmemRequest(void *arg) +{ + Size size; + + /* + * Need the fixed struct and the array of RepackWorkerInfo. + */ + size = sizeof(RepackShmemStruct); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_repack_replication_slots, + sizeof(RepackWorkerInfo))); + + ShmemRequestStruct(.name = "Repack Data", + .size = size, + .ptr = (void **) &RepackShmem, + ); +} + +static void +RepackShmemInit(void *arg) +{ + RepackWorkerInfo *reinfo; + + reinfo = (RepackWorkerInfo *) ((char *) RepackShmem + + MAXALIGN(sizeof(RepackShmemStruct))); + + for (int i = 0; i < max_repack_replication_slots; i++) + { + reinfo[i].ri_in_use = false; + reinfo[i].ri_backendpid = 0; + reinfo[i].ri_dbid = InvalidOid; + reinfo[i].ri_relid = InvalidOid; + reinfo[i].ri_toastrelid = InvalidOid; + } +} + /* * Check if the table (and its index) still meets the requirements of * cluster_rel(). diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index bd626a16363..080c64ea3c8 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -78,6 +78,7 @@ #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_namespace.h" +#include "commands/repack.h" #include "commands/vacuum.h" #include "common/int.h" #include "funcapi.h" @@ -2422,6 +2423,25 @@ do_autovacuum(void) } } LWLockRelease(AutovacuumLock); + + /* + * Similarly, if the table is being processed by concurrent repack, + * skip it (but make a note of that). We wouldn't be able to acquire + * its lock anyway. + */ + if (!skipit) + { + MemoryContextSwitchTo(PortalContext); + + skipit = is_table_under_repack(MyDatabaseId, relid); + if (skipit) + ereport(LOG, + errmsg("skipping table \"%s.%s.%s\" because it's being repacked in concurrent mode", + get_database_name(MyDatabaseId), + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid))); + } + if (skipit) { LWLockRelease(AutovacuumScheduleLock); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 7bda5298558..e206304f204 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -332,6 +332,7 @@ SInvalWrite "Waiting to add a message to the shared catalog invalidation queue." WALBufMapping "Waiting to replace a page in WAL buffers." WALWrite "Waiting for WAL buffers to be written to disk." ControlFile "Waiting to read or update the pg_control file or create a new WAL file." +Repack "Waiting to read or update tables in process by concurrent repack." MultiXactGen "Waiting to read or update shared multixact state." RelCacheInit "Waiting to read or update a pg_internal.init relation cache initialization file." CheckpointerComm "Waiting to manage fsync requests." diff --git a/src/include/commands/repack.h b/src/include/commands/repack.h index fd16e74b179..be7d38b5fae 100644 --- a/src/include/commands/repack.h +++ b/src/include/commands/repack.h @@ -42,6 +42,8 @@ extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel); extern void cluster_rel(RepackCommand command, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel); +extern bool is_table_under_repack(Oid databaseId, Oid relid); + extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode); extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal); diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index af8553bcb6c..3f08f4a15d4 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -41,7 +41,7 @@ PG_LWLOCK(6, SInvalWrite) PG_LWLOCK(7, WALBufMapping) PG_LWLOCK(8, WALWrite) PG_LWLOCK(9, ControlFile) -/* 10 was CheckpointLock */ +PG_LWLOCK(10, Repack) /* 11 was XactSLRULock */ /* 12 was SubtransSLRULock */ PG_LWLOCK(13, MultiXactGen) diff --git a/src/include/storage/subsystemlist.h b/src/include/storage/subsystemlist.h index 9ad619080be..4e683b8b0a8 100644 --- a/src/include/storage/subsystemlist.h +++ b/src/include/storage/subsystemlist.h @@ -72,6 +72,7 @@ PG_SHMEM_SUBSYSTEM(WalSummarizerShmemCallbacks) PG_SHMEM_SUBSYSTEM(PgArchShmemCallbacks) PG_SHMEM_SUBSYSTEM(ApplyLauncherShmemCallbacks) PG_SHMEM_SUBSYSTEM(SlotSyncShmemCallbacks) +PG_SHMEM_SUBSYSTEM(RepackShmemCallbacks) /* other modules that need some shared memory space */ PG_SHMEM_SUBSYSTEM(BTreeShmemCallbacks) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 637c669a146..d019e03aaf1 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2639,9 +2639,12 @@ ReorderBufferTupleCidEnt ReorderBufferTupleCidKey ReorderBufferUpdateProgressTxnCB ReorderTuple +RepackCleanupContext RepackCommand RepackDecodingState +RepackShmemStruct RepackStmt +RepackWorkerInfo ReparameterizeForeignPathByChild_function ReplOriginId ReplOriginXactState -- 2.47.3